List:Commits« Previous MessageNext Message »
From:konstantin Date:January 4 2006 9:39pm
Subject:bk commit into 5.1 tree (konstantin:1.2035)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of kostja. When kostja does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2035 06/01/05 00:38:51 konstantin@stripped +13 -0
  Merge mysql.com:/opt/local/work/mysql-5.0-merge
  into  mysql.com:/opt/local/work/mysql-5.1-merge

  sql/lock.cc
    1.83 06/01/05 00:38:46 konstantin@stripped +1 -1
    Manual merge.

  mysql-test/t/information_schema.test
    1.67 06/01/05 00:38:46 konstantin@stripped +0 -2
    Manual merge.

  mysql-test/r/information_schema.result
    1.97 06/01/05 00:38:46 konstantin@stripped +0 -0
    Manual merge.

  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
    1.9 06/01/05 00:36:23 konstantin@stripped +0 -1
    Auto merged

  sql/sql_show.cc
    1.295 06/01/05 00:36:23 konstantin@stripped +0 -0
    Auto merged

  sql/sql_cache.cc
    1.91 06/01/05 00:36:22 konstantin@stripped +0 -0
    Auto merged

  sql/sql_base.cc
    1.293 06/01/05 00:36:22 konstantin@stripped +0 -0
    Auto merged

  sql/sql_acl.cc
    1.170 06/01/05 00:36:22 konstantin@stripped +0 -0
    Auto merged

  sql/ha_innodb.cc
    1.248 06/01/05 00:36:22 konstantin@stripped +0 -0
    Auto merged

  mysql-test/t/rpl_stm_sp.test
    1.12 06/01/05 00:36:21 konstantin@stripped +0 -8
    Auto merged

  mysql-test/r/rpl_stm_sp.result
    1.16 06/01/05 00:36:21 konstantin@stripped +0 -3
    Auto merged

  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
    1.2.4.2 06/01/05 00:36:21 konstantin@stripped +0 -0
    Merge rename: ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp -> storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp

  mysys/hash.c
    1.48 06/01/05 00:36:21 konstantin@stripped +0 -0
    Auto merged

  mysql-test/t/rpl_stm_sp.test
    1.9.1.2 06/01/05 00:36:21 konstantin@stripped +0 -0
    Merge rename: mysql-test/t/rpl_sp.test -> mysql-test/t/rpl_stm_sp.test

  mysql-test/r/rpl_stm_sp.result
    1.13.1.2 06/01/05 00:36:21 konstantin@stripped +0 -0
    Merge rename: mysql-test/r/rpl_sp.result -> mysql-test/r/rpl_stm_sp.result

  include/config-win.h
    1.74 06/01/05 00:36:21 konstantin@stripped +0 -4
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	konstantin
# Host:	dragonfly.local
# Root:	/opt/local/work/mysql-5.1-merge/RESYNC

--- 1.47/mysys/hash.c	2005-11-23 23:44:54 +03:00
+++ 1.48/mysys/hash.c	2006-01-05 00:36:21 +03:00
@@ -36,9 +36,10 @@
 
 static uint hash_mask(uint hashnr,uint buffmax,uint maxlength);
 static void movelink(HASH_LINK *array,uint pos,uint next_link,uint newlink);
-static int hashcmp(HASH *hash,HASH_LINK *pos,const byte *key,uint length);
+static int hashcmp(const HASH *hash, HASH_LINK *pos, const byte *key,
+                   uint length);
 
-static uint calc_hash(HASH *hash,const byte *key,uint length)
+static uint calc_hash(const HASH *hash, const byte *key, uint length)
 {
   ulong nr1=1, nr2=4;
   hash->charset->coll->hash_sort(hash->charset,(uchar*) key,length,&nr1,&nr2);
@@ -63,7 +64,6 @@
   hash->key_offset=key_offset;
   hash->key_length=key_length;
   hash->blength=1;
-  hash->current_record= NO_RECORD;		/* For the future */
   hash->get_key=get_key;
   hash->free=free_element;
   hash->flags=flags;
@@ -135,7 +135,6 @@
   reset_dynamic(&hash->array);
   /* Set row pointers so that the hash can be reused at once */
   hash->blength= 1;
-  hash->current_record= NO_RECORD;
   DBUG_VOID_RETURN;
 }
 
@@ -147,7 +146,8 @@
 */
 
 static inline char*
-hash_key(HASH *hash,const byte *record,uint *length,my_bool first)
+hash_key(const HASH *hash, const byte *record, uint *length,
+         my_bool first)
 {
   if (hash->get_key)
     return (*hash->get_key)(record,length,first);
@@ -163,8 +163,8 @@
   return (hashnr & ((buffmax >> 1) -1));
 }
 
-static uint hash_rec_mask(HASH *hash,HASH_LINK *pos,uint buffmax,
-			  uint maxlength)
+static uint hash_rec_mask(const HASH *hash, HASH_LINK *pos,
+                          uint buffmax, uint maxlength)
 {
   uint length;
   byte *key= (byte*) hash_key(hash,pos->data,&length,0);
@@ -186,14 +186,25 @@
 }
 
 
-	/* Search after a record based on a key */
-	/* Sets info->current_ptr to found record */
+gptr hash_search(const HASH *hash, const byte *key, uint length)
+{
+  HASH_SEARCH_STATE state;
+  return hash_first(hash, key, length, &state);
+}
+
+/*
+  Search after a record based on a key
+
+  NOTE
+   Assigns the number of the found record to HASH_SEARCH_STATE state
+*/
 
-gptr hash_search(HASH *hash,const byte *key,uint length)
+gptr hash_first(const HASH *hash, const byte *key, uint length,
+                HASH_SEARCH_STATE *current_record)
 {
   HASH_LINK *pos;
   uint flag,idx;
-  DBUG_ENTER("hash_search");
+  DBUG_ENTER("hash_first");
 
   flag=1;
   if (hash->records)
@@ -206,7 +217,7 @@
       if (!hashcmp(hash,pos,key,length))
       {
 	DBUG_PRINT("exit",("found key at %d",idx));
-	hash->current_record= idx;
+	*current_record= idx;
 	DBUG_RETURN (pos->data);
       }
       if (flag)
@@ -218,31 +229,32 @@
     }
     while ((idx=pos->next) != NO_RECORD);
   }
-  hash->current_record= NO_RECORD;
+  *current_record= NO_RECORD;
   DBUG_RETURN(0);
 }
 
 	/* Get next record with identical key */
 	/* Can only be called if previous calls was hash_search */
 
-gptr hash_next(HASH *hash,const byte *key,uint length)
+gptr hash_next(const HASH *hash, const byte *key, uint length,
+               HASH_SEARCH_STATE *current_record)
 {
   HASH_LINK *pos;
   uint idx;
 
-  if (hash->current_record != NO_RECORD)
+  if (*current_record != NO_RECORD)
   {
     HASH_LINK *data=dynamic_element(&hash->array,0,HASH_LINK*);
-    for (idx=data[hash->current_record].next; idx != NO_RECORD ; idx=pos->next)
+    for (idx=data[*current_record].next; idx != NO_RECORD ; idx=pos->next)
     {
       pos=data+idx;
       if (!hashcmp(hash,pos,key,length))
       {
-	hash->current_record= idx;
+	*current_record= idx;
 	return pos->data;
       }
     }
-    hash->current_record=NO_RECORD;
+    *current_record= NO_RECORD;
   }
   return 0;
 }
@@ -281,7 +293,8 @@
     != 0 key of record != key
  */
 
-static int hashcmp(HASH *hash,HASH_LINK *pos,const byte *key,uint length)
+static int hashcmp(const HASH *hash, HASH_LINK *pos, const byte *key,
+                   uint length)
 {
   uint rec_keylength;
   byte *rec_key= (byte*) hash_key(hash,pos->data,&rec_keylength,1);
@@ -307,7 +320,6 @@
   if (!(empty=(HASH_LINK*) alloc_dynamic(&info->array)))
     return(TRUE);				/* No more memory */
 
-  info->current_record= NO_RECORD;
   data=dynamic_element(&info->array,0,HASH_LINK*);
   halfbuff= info->blength >> 1;
 
@@ -450,7 +462,6 @@
   }
 
   if ( --(hash->records) < hash->blength >> 1) hash->blength>>=1;
-  hash->current_record= NO_RECORD;
   lastpos=data+hash->records;
 
   /* Remove link to record */
@@ -543,7 +554,6 @@
     if ((idx=pos->next) == NO_RECORD)
       DBUG_RETURN(1);			/* Not found in links */
   }
-  hash->current_record= NO_RECORD;
   org_link= *pos;
   empty=idx;
 
@@ -593,10 +603,10 @@
   isn't changed
 */
 
-void hash_replace(HASH *hash, uint idx, byte *new_row)
+void hash_replace(HASH *hash, HASH_SEARCH_STATE *current_record, byte *new_row)
 {
-  if (idx != NO_RECORD)				/* Safety */
-    dynamic_element(&hash->array,idx,HASH_LINK*)->data=new_row;
+  if (*current_record != NO_RECORD)            /* Safety */
+    dynamic_element(&hash->array, *current_record, HASH_LINK*)->data= new_row;
 }
 
 

--- 1.82/sql/lock.cc	2005-12-26 12:50:35 +03:00
+++ 1.83/sql/lock.cc	2006-01-05 00:38:46 +03:00
@@ -731,15 +731,16 @@
   char  key[MAX_DBKEY_LENGTH];
   char *db= table_list->db;
   uint  key_length;
+  HASH_SEARCH_STATE state;
   DBUG_ENTER("lock_table_name");
   DBUG_PRINT("enter",("db: %s  name: %s", db, table_list->table_name));
 
   key_length= create_table_def_key(thd, key, table_list, 0);
 
   /* Only insert the table if we haven't insert it already */
-  for (table=(TABLE*) hash_search(&open_cache,(byte*) key,key_length) ;
+  for (table=(TABLE*) hash_first(&open_cache, (byte*)key, key_length, &state);
        table ;
-       table = (TABLE*) hash_next(&open_cache,(byte*) key,key_length))
+       table = (TABLE*) hash_next(&open_cache,(byte*) key,key_length, &state))
   {
     if (table->in_use == thd)
     {

--- 1.169/sql/sql_acl.cc	2005-12-31 07:54:28 +03:00
+++ 1.170/sql/sql_acl.cc	2006-01-05 00:36:22 +03:00
@@ -2243,14 +2243,14 @@
   char helping [NAME_LEN*2+USERNAME_LENGTH+3];
   uint len;
   GRANT_NAME *grant_name,*found=0;
+  HASH_SEARCH_STATE state;
 
   len  = (uint) (strmov(strmov(strmov(helping,user)+1,db)+1,tname)-helping)+ 1;
-  for (grant_name=(GRANT_NAME*) hash_search(name_hash,
-					      (byte*) helping,
-					      len) ;
+  for (grant_name= (GRANT_NAME*) hash_first(name_hash, (byte*) helping,
+                                            len, &state);
        grant_name ;
        grant_name= (GRANT_NAME*) hash_next(name_hash,(byte*) helping,
-					     len))
+                                           len, &state))
   {
     if (exact)
     {

--- 1.292/sql/sql_base.cc	2005-12-31 07:54:30 +03:00
+++ 1.293/sql/sql_base.cc	2006-01-05 00:36:22 +03:00
@@ -1702,6 +1702,7 @@
   char	key[MAX_DBKEY_LENGTH];
   uint	key_length;
   char	*alias= table_list->alias;
+  HASH_SEARCH_STATE state;
   DBUG_ENTER("open_table");
 
   /* find a unused table in the open table cache */
@@ -1862,9 +1863,11 @@
   if (thd->handler_tables)
     mysql_ha_flush(thd, (TABLE_LIST*) NULL, MYSQL_HA_REOPEN_ON_USAGE, TRUE);
 
-  for (table=(TABLE*) hash_search(&open_cache,(byte*) key,key_length) ;
+  for (table= (TABLE*) hash_first(&open_cache, (byte*) key, key_length,
+                                  &state);
        table && table->in_use ;
-       table = (TABLE*) hash_next(&open_cache,(byte*) key,key_length))
+       table= (TABLE*) hash_next(&open_cache, (byte*) key, key_length,
+                                 &state))
   {
     if (table->s->version != refresh_version)
     {
@@ -2240,10 +2243,12 @@
     uint key_length= table->s->table_cache_key.length;
 
     DBUG_PRINT("loop", ("table_name: %s", table->alias));
-    for (TABLE *search=(TABLE*) hash_search(&open_cache,
-					    (byte*) key,key_length) ;
+    HASH_SEARCH_STATE state;
+    for (TABLE *search= (TABLE*) hash_first(&open_cache, (byte*) key,
+                                             key_length, &state);
 	 search ;
-	 search = (TABLE*) hash_next(&open_cache,(byte*) key,key_length))
+         search= (TABLE*) hash_next(&open_cache, (byte*) key,
+                                    key_length, &state))
     {
       DBUG_PRINT("info", ("share: 0x%lx  locked_by_flush: %d  "
                           "locked_by_name: %d  db_stat: %u  version: %u",
@@ -5848,11 +5853,14 @@
   key_length=(uint) (strmov(strmov(key,db)+1,table_name)-key)+1;
   for (;;)
   {
+    HASH_SEARCH_STATE state;
     result= signalled= 0;
 
-    for (table=(TABLE*) hash_search(&open_cache,(byte*) key,key_length) ;
+    for (table= (TABLE*) hash_first(&open_cache, (byte*) key, key_length,
+                                    &state);
          table;
-         table = (TABLE*) hash_next(&open_cache,(byte*) key,key_length))
+         table= (TABLE*) hash_next(&open_cache, (byte*) key, key_length,
+                                   &state))
     {
       THD *in_use;
       table->s->version=0L;		/* Free when thread is ready */

--- 1.90/sql/sql_cache.cc	2005-12-21 21:24:58 +03:00
+++ 1.91/sql/sql_cache.cc	2006-01-05 00:36:22 +03:00
@@ -3055,6 +3055,7 @@
   }
   case Query_cache_block::TABLE:
   {
+    HASH_SEARCH_STATE record_idx;
     DBUG_PRINT("qcache", ("block 0x%lx TABLE", (ulong) block));
     if (*border == 0)
       break;
@@ -3072,7 +3073,7 @@
     byte *key;
     uint key_length;
     key=query_cache_table_get_key((byte*) block, &key_length, 0);
-    hash_search(&tables, (byte*) key, key_length);
+    hash_first(&tables, (byte*) key, key_length, &record_idx);
 
     block->destroy();
     new_block->init(len);
@@ -3106,7 +3107,7 @@
     /* Fix pointer to table name */
     new_block->table()->table(new_block->table()->db() + tablename_offset);
     /* Fix hash to point at moved block */
-    hash_replace(&tables, tables.current_record, (byte*) new_block);
+    hash_replace(&tables, &record_idx, (byte*) new_block);
 
     DBUG_PRINT("qcache", ("moved %lu bytes to 0x%lx, new gap at 0x%lx",
 			len, (ulong) new_block, (ulong) *border));
@@ -3114,6 +3115,7 @@
   }
   case Query_cache_block::QUERY:
   {
+    HASH_SEARCH_STATE record_idx;
     DBUG_PRINT("qcache", ("block 0x%lx QUERY", (ulong) block));
     if (*border == 0)
       break;
@@ -3131,7 +3133,7 @@
     byte *key;
     uint key_length;
     key=query_cache_query_get_key((byte*) block, &key_length, 0);
-    hash_search(&queries, (byte*) key, key_length);
+    hash_first(&queries, (byte*) key, key_length, &record_idx);
     // Move table of used tables 
     memmove((char*) new_block->table(0), (char*) block->table(0),
 	   ALIGN_SIZE(n_tables*sizeof(Query_cache_block_table)));
@@ -3199,7 +3201,7 @@
       net->query_cache_query= (gptr) new_block;
     }
     /* Fix hash to point at moved block */
-    hash_replace(&queries, queries.current_record, (byte*) new_block);
+    hash_replace(&queries, &record_idx, (byte*) new_block);
     DBUG_PRINT("qcache", ("moved %lu bytes to 0x%lx, new gap at 0x%lx",
 			len, (ulong) new_block, (ulong) *border));
     break;

--- 1.294/sql/sql_show.cc	2005-12-31 11:26:17 +03:00
+++ 1.295/sql/sql_show.cc	2006-01-05 00:36:23 +03:00
@@ -2238,6 +2238,8 @@
   int error= 1;
   enum legacy_db_type not_used;
   Open_tables_state open_tables_state_backup;
+  bool save_view_prepare_mode= lex->view_prepare_mode;
+  lex->view_prepare_mode= TRUE;
   DBUG_ENTER("get_all_tables");
 
   LINT_INIT(end);
@@ -2423,6 +2425,7 @@
   lex->derived_tables= derived_tables;
   lex->all_selects_list= old_all_select_lex;
   lex->query_tables_last= save_query_tables_last;
+  lex->view_prepare_mode= save_view_prepare_mode;
   *save_query_tables_last= 0;
   lex->sql_command= save_sql_command;
   DBUG_RETURN(error);

--- 1.13.1.1/mysql-test/r/rpl_sp.result	2006-01-04 17:23:45 +03:00
+++ 1.16/mysql-test/r/rpl_stm_sp.result	2006-01-05 00:36:21 +03:00
@@ -264,7 +264,7 @@
 select * from t1;
 a
 1
-show binlog events in 'master-bin.000001' from 98;
+show binlog events in 'master-bin.000001' from 102;
 Log_name	Pos	Event_type	Server_id	End_log_pos	Info
 master-bin.000001	#	Query	1	#	drop database if exists mysqltest1
 master-bin.000001	#	Query	1	#	create database mysqltest1

--- 1.9.1.1/mysql-test/t/rpl_sp.test	2006-01-04 17:23:45 +03:00
+++ 1.12/mysql-test/t/rpl_stm_sp.test	2006-01-05 00:36:21 +03:00
@@ -1,3 +1,6 @@
+# row-based and statement have expected binlog difference in result files
+-- source include/have_binlog_format_statement.inc
+
 # Test of replication of stored procedures (WL#2146 for MySQL 5.0)
 # Modified by WL#2971.
 
@@ -341,7 +344,7 @@
 insert into t1 values (1);
 select * from t1;
 --replace_column 2 # 5 #
-show binlog events in 'master-bin.000001' from 98;
+show binlog events in 'master-bin.000001' from 102;
 sync_slave_with_master;
 select * from t1;
 

--- 1.96/mysql-test/r/information_schema.result	2005-12-25 02:43:16 +03:00
+++ 1.97/mysql-test/r/information_schema.result	2006-01-05 00:38:46 +03:00
@@ -1077,6 +1077,22 @@
 32	32
 64	64
 drop table t1;
+CREATE TABLE t1 (f1 BIGINT, f2 VARCHAR(20), f3 BIGINT);
+INSERT INTO t1 SET f1 = 1, f2 = 'Schoenenbourg', f3 = 1;
+CREATE FUNCTION func2() RETURNS BIGINT RETURN 1;
+CREATE FUNCTION func1() RETURNS BIGINT
+BEGIN
+RETURN ( SELECT COUNT(*) FROM INFORMATION_SCHEMA.VIEWS);
+END//
+CREATE VIEW v1 AS SELECT 1 FROM t1
+WHERE f3 = (SELECT func2 ());
+SELECT func1();
+func1()
+1
+DROP TABLE t1;
+DROP VIEW v1;
+DROP FUNCTION func1;
+DROP FUNCTION func2;
 select * from information_schema.engines WHERE ENGINE="MyISAM";
 ENGINE	SUPPORT	COMMENT	TRANSACTIONS	XA	SAVEPOINTS
 MyISAM	ENABLED	Default engine as of MySQL 3.23 with great performance	NO	NO	NO

--- 1.66/mysql-test/t/information_schema.test	2005-12-24 20:13:37 +03:00
+++ 1.67/mysql-test/t/information_schema.test	2006-01-05 00:38:46 +03:00
@@ -769,6 +769,27 @@
 drop table t1;
 
 #
+# Bug#15533 crash, information_schema, function, view
+#
+CREATE TABLE t1 (f1 BIGINT, f2 VARCHAR(20), f3 BIGINT);
+INSERT INTO t1 SET f1 = 1, f2 = 'Schoenenbourg', f3 = 1;
+
+CREATE FUNCTION func2() RETURNS BIGINT RETURN 1;
+
+delimiter //;
+CREATE FUNCTION func1() RETURNS BIGINT
+BEGIN
+  RETURN ( SELECT COUNT(*) FROM INFORMATION_SCHEMA.VIEWS);
+END//
+delimiter ;//
+
+CREATE VIEW v1 AS SELECT 1 FROM t1
+                    WHERE f3 = (SELECT func2 ());
+SELECT func1();
+DROP TABLE t1;
+DROP VIEW v1;
+DROP FUNCTION func1;
+DROP FUNCTION func2;
 # Show engines
 #
 

--- 1.2.4.1/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2006-01-04 17:46:44 +03:00
+++ 1.9/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2006-01-05 00:36:23 +03:00
@@ -20,571 +20,646 @@
 #include <ndb_limits.h>
 #include <pc.hpp>
 #include <signaldata/TupCommit.hpp>
+#include "../dblqh/Dblqh.hpp"
 
 #define ljam() { jamLine(5000 + __LINE__); }
 #define ljamEntry() { jamEntryLine(5000 + __LINE__); }
 
-void Dbtup::execTUP_WRITELOG_REQ(Signal* signal)
-{
-  jamEntry();
-  OperationrecPtr loopOpPtr;
-  loopOpPtr.i = signal->theData[0];
-  Uint32 gci = signal->theData[1];
-  ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  while (loopOpPtr.p->nextActiveOp != RNIL) {
-    ljam();
-    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  }//while
-  do {
-    Uint32 blockNo = refToBlock(loopOpPtr.p->userblockref);
-    ndbrequire(loopOpPtr.p->transstate == STARTED);
-    signal->theData[0] = loopOpPtr.p->userpointer;
-    signal->theData[1] = gci;
-    if (loopOpPtr.p->prevActiveOp == RNIL) {
-      ljam();
-      EXECUTE_DIRECT(blockNo, GSN_LQH_WRITELOG_REQ, signal, 2);
-      return;
-    }//if
-    ljam();
-    EXECUTE_DIRECT(blockNo, GSN_LQH_WRITELOG_REQ, signal, 2);
-    jamEntry();
-    loopOpPtr.i = loopOpPtr.p->prevActiveOp;
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  } while (true);
-}//Dbtup::execTUP_WRITELOG_REQ()
-
 void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
 {
   TablerecPtr regTabPtr;
   FragrecordPtr regFragPtr;
+  Uint32 frag_page_id, frag_id;
 
-  jamEntry();
+  ljamEntry();
 
-  Uint32 fragId = signal->theData[0];
-  regTabPtr.i = signal->theData[1];
-  Uint32 fragPageId = signal->theData[2];
-  Uint32 pageIndex = signal->theData[3];
+  frag_id= signal->theData[0];
+  regTabPtr.i= signal->theData[1];
+  frag_page_id= signal->theData[2];
+  Uint32 page_index= signal->theData[3];
 
   ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
-  getFragmentrec(regFragPtr, fragId, regTabPtr.p);
-  ndbrequire(regFragPtr.p != NULL);
+  
+  getFragmentrec(regFragPtr, frag_id, regTabPtr.p);
+  ndbassert(regFragPtr.p != NULL);
+  
+  if (! (((frag_page_id << MAX_TUPLES_BITS) + page_index) == ~0))
+  {
+    Local_key tmp;
+    tmp.m_page_no= getRealpid(regFragPtr.p, frag_page_id); 
+    tmp.m_page_idx= page_index;
+    
+    PagePtr pagePtr;
+    Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
+    
+    if (regTabPtr.p->m_attributes[MM].m_no_of_varsize)
+    {
+      ljam();
+      
+      if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
+      {
+	free_var_part(regFragPtr.p, regTabPtr.p,
+		      *(Var_part_ref*)ptr->get_var_part_ptr(regTabPtr.p),
+		      Var_page::CHAIN);
+      }
+      free_var_part(regFragPtr.p, regTabPtr.p, &tmp, (Var_page*)pagePtr.p, 0);
+    } else {
+      free_fix_rec(regFragPtr.p, regTabPtr.p, &tmp, (Fix_page*)pagePtr.p);
+    }
+  }
+}
 
-  PagePtr pagePtr;
-  pagePtr.i = getRealpid(regFragPtr.p, fragPageId);
-  ptrCheckGuard(pagePtr, cnoOfPage, page);
-  Uint32 pageIndexScaled = pageIndex >> 1;
-  ndbrequire((pageIndex & 1) == 0);
-  Uint32 pageOffset = ZPAGE_HEADER_SIZE + 
-                     (regTabPtr.p->tupheadsize * pageIndexScaled);
-//---------------------------------------------------
-/* --- Deallocate a tuple as requested by ACC  --- */
-//---------------------------------------------------
-  if (isUndoLoggingNeeded(regFragPtr.p, fragPageId)) {
+void Dbtup::execTUP_WRITELOG_REQ(Signal* signal)
+{
+  jamEntry();
+  OperationrecPtr loopOpPtr;
+  loopOpPtr.i= signal->theData[0];
+  Uint32 gci= signal->theData[1];
+  c_operation_pool.getPtr(loopOpPtr);
+  while (loopOpPtr.p->prevActiveOp != RNIL) {
     ljam();
-    cprAddUndoLogRecord(signal,
-                        ZLCPR_TYPE_INSERT_TH,
-                        fragPageId,
-                        pageIndex,
-                        regTabPtr.i,
-                        fragId,
-                        regFragPtr.p->checkpointVersion);
-    cprAddData(signal,
-               regFragPtr.p,
-               pagePtr.i,
-               regTabPtr.p->tupheadsize,
-               pageOffset);
-  }//if
-  {
-    freeTh(regFragPtr.p,
-           regTabPtr.p,
-           signal,
-           pagePtr.p,
-           pageOffset);
+    loopOpPtr.i= loopOpPtr.p->prevActiveOp;
+    c_operation_pool.getPtr(loopOpPtr);
   }
+  do {
+    ndbrequire(get_trans_state(loopOpPtr.p) == TRANS_STARTED);
+    signal->theData[0]= loopOpPtr.p->userpointer;
+    signal->theData[1]= gci;
+    if (loopOpPtr.p->nextActiveOp == RNIL) {
+      ljam();
+      EXECUTE_DIRECT(DBLQH, GSN_LQH_WRITELOG_REQ, signal, 2);
+      return;
+    }
+    ljam();
+    EXECUTE_DIRECT(DBLQH, GSN_LQH_WRITELOG_REQ, signal, 2);
+    jamEntry();
+    loopOpPtr.i= loopOpPtr.p->nextActiveOp;
+    c_operation_pool.getPtr(loopOpPtr);
+  } while (true);
 }
 
-/* ---------------------------------------------------------------- */
-/* ------------ PERFORM A COMMIT ON AN UPDATE OPERATION  ---------- */
-/* ---------------------------------------------------------------- */
-void Dbtup::commitUpdate(Signal* signal,
-                         Operationrec*  const regOperPtr,
-                         Fragrecord* const regFragPtr,
-                         Tablerec* const regTabPtr)
-{
-  if (regOperPtr->realPageIdC != RNIL) {
-    if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageIdC)) {
-/* ------------------------------------------------------------------------ */
-/* IF THE COPY WAS CREATED WITHIN THIS CHECKPOINT WE ONLY HAVE              */
-/* TO LOG THE CREATION OF THE COPY. IF HOWEVER IT WAS CREATED BEFORE  SAVE  */
-/* THIS CHECKPOINT, WE HAVE TO THE DATA AS WELL.                            */
-/* ------------------------------------------------------------------------ */
-      if (regOperPtr->undoLogged) {
-        ljam();
-        cprAddUndoLogRecord(signal,
-                            ZLCPR_TYPE_INSERT_TH_NO_DATA,
-                            regOperPtr->fragPageIdC,
-                            regOperPtr->pageIndexC,
-                            regOperPtr->tableRef,
-                            regOperPtr->fragId,
-                            regFragPtr->checkpointVersion);
-      } else {
-        ljam();
-        cprAddUndoLogRecord(signal,
-                            ZLCPR_TYPE_INSERT_TH,
-                            regOperPtr->fragPageIdC,
-                            regOperPtr->pageIndexC,
-                            regOperPtr->tableRef,
-                            regOperPtr->fragId,
-                            regFragPtr->checkpointVersion);
-        cprAddData(signal,
-                   regFragPtr,
-                   regOperPtr->realPageIdC,
-                   regTabPtr->tupheadsize,
-                   regOperPtr->pageOffsetC);
-      }//if
-    }//if
-
-    PagePtr copyPagePtr;
-    copyPagePtr.i = regOperPtr->realPageIdC;
-    ptrCheckGuard(copyPagePtr, cnoOfPage, page);
-    freeTh(regFragPtr,
-           regTabPtr,
-           signal,
-           copyPagePtr.p,
-           (Uint32)regOperPtr->pageOffsetC);
-    regOperPtr->realPageIdC = RNIL;
-    regOperPtr->fragPageIdC = RNIL;
-    regOperPtr->pageOffsetC = ZNIL;
-    regOperPtr->pageIndexC = ZNIL;
-  }//if
-}//Dbtup::commitUpdate()
-
-void
-Dbtup::commitSimple(Signal* signal,
-                    Operationrec* const regOperPtr,
-                    Fragrecord* const regFragPtr,
-                    Tablerec* const regTabPtr)
-{
-  operPtr.p = regOperPtr;
-  fragptr.p = regFragPtr;
-  tabptr.p = regTabPtr;
-
-  // Checking detached triggers
-  checkDetachedTriggers(signal,
-                        regOperPtr,
-                        regTabPtr);
+void Dbtup::removeActiveOpList(Operationrec*  const regOperPtr,
+                               Tuple_header *tuple_ptr)
+{
+  OperationrecPtr raoOperPtr;
 
-  removeActiveOpList(regOperPtr);
-  if (regOperPtr->optype == ZUPDATE) {
-    ljam();
-    commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
-    if (regTabPtr->GCPIndicator) {
-      updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
-    }//if
-  } else if (regOperPtr->optype == ZINSERT) {
-    ljam();
-    if (regTabPtr->GCPIndicator) {
-      updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
-    }//if
-  } else {
-    ndbrequire(regOperPtr->optype == ZDELETE);
-  }//if
-}//Dbtup::commitSimple()
-
-void Dbtup::removeActiveOpList(Operationrec*  const regOperPtr)
-{
-  if (regOperPtr->inActiveOpList == ZTRUE) {
-    OperationrecPtr raoOperPtr;
-    regOperPtr->inActiveOpList = ZFALSE;
-    if (regOperPtr->prevActiveOp != RNIL) {
+  /**
+   * Release copy tuple
+   */
+  if(regOperPtr->op_struct.op_type != ZDELETE && 
+     !regOperPtr->m_copy_tuple_location.isNull())
+    c_undo_buffer.free_copy_tuple(&regOperPtr->m_copy_tuple_location);
+  
+  if (regOperPtr->op_struct.in_active_list) {
+    regOperPtr->op_struct.in_active_list= false;
+    if (regOperPtr->nextActiveOp != RNIL) {
       ljam();
-      raoOperPtr.i = regOperPtr->prevActiveOp;
-      ptrCheckGuard(raoOperPtr, cnoOfOprec, operationrec);
-      raoOperPtr.p->nextActiveOp = regOperPtr->nextActiveOp;
+      raoOperPtr.i= regOperPtr->nextActiveOp;
+      c_operation_pool.getPtr(raoOperPtr);
+      raoOperPtr.p->prevActiveOp= regOperPtr->prevActiveOp;
     } else {
       ljam();
-      PagePtr pagePtr;
-      pagePtr.i = regOperPtr->realPageId;
-      ptrCheckGuard(pagePtr, cnoOfPage, page);
-      ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
-      pagePtr.p->pageWord[regOperPtr->pageOffset] = regOperPtr->nextActiveOp;
-    }//if
-    if (regOperPtr->nextActiveOp != RNIL) {
+      tuple_ptr->m_operation_ptr_i = regOperPtr->prevActiveOp;
+    }
+    if (regOperPtr->prevActiveOp != RNIL) {
       ljam();
-      raoOperPtr.i = regOperPtr->nextActiveOp;
-      ptrCheckGuard(raoOperPtr, cnoOfOprec, operationrec);
-      raoOperPtr.p->prevActiveOp = regOperPtr->prevActiveOp;
-    }//if
-    regOperPtr->prevActiveOp = RNIL;
-    regOperPtr->nextActiveOp = RNIL;
-  }//if
-}//Dbtup::removeActiveOpList()
+      raoOperPtr.i= regOperPtr->prevActiveOp;
+      c_operation_pool.getPtr(raoOperPtr);
+      raoOperPtr.p->nextActiveOp= regOperPtr->nextActiveOp;
+    }
+    regOperPtr->prevActiveOp= RNIL;
+    regOperPtr->nextActiveOp= RNIL;
+  }
+}
 
 /* ---------------------------------------------------------------- */
 /* INITIALIZATION OF ONE CONNECTION RECORD TO PREPARE FOR NEXT OP.  */
 /* ---------------------------------------------------------------- */
-void Dbtup::initOpConnection(Operationrec* regOperPtr,
-			     Fragrecord * fragPtrP)
+void Dbtup::initOpConnection(Operationrec* regOperPtr)
 {
-  Uint32 RinFragList = regOperPtr->inFragList;
-  regOperPtr->transstate = IDLE;
-  regOperPtr->currentAttrinbufLen = 0;
-  regOperPtr->optype = ZREAD;
-  if (RinFragList == ZTRUE) {
-    OperationrecPtr tropNextLinkPtr;
-    OperationrecPtr tropPrevLinkPtr;
-/*----------------------------------------------------------------- */
-/*       TO ENSURE THAT WE HAVE SUCCESSFUL ABORTS OF FOLLOWING      */
-/*       OPERATIONS WHICH NEVER STARTED WE SET THE OPTYPE TO READ.  */
-/*----------------------------------------------------------------- */
-/*       REMOVE IT FROM THE DOUBLY LINKED LIST ON THE FRAGMENT      */
-/*----------------------------------------------------------------- */
-    tropPrevLinkPtr.i = regOperPtr->prevOprecInList;
-    tropNextLinkPtr.i = regOperPtr->nextOprecInList;
-    regOperPtr->inFragList = ZFALSE;
-    if (tropPrevLinkPtr.i == RNIL) {
-      ljam();
-      fragPtrP->firstusedOprec = tropNextLinkPtr.i;
-    } else {
-      ljam();
-      ptrCheckGuard(tropPrevLinkPtr, cnoOfOprec, operationrec);
-      tropPrevLinkPtr.p->nextOprecInList = tropNextLinkPtr.i;
-    }//if
-    if (tropNextLinkPtr.i == RNIL) {
-      fragPtrP->lastusedOprec = tropPrevLinkPtr.i;
-    } else {
-      ptrCheckGuard(tropNextLinkPtr, cnoOfOprec, operationrec);
-      tropNextLinkPtr.p->prevOprecInList = tropPrevLinkPtr.i;
-    }
-    regOperPtr->prevOprecInList = RNIL;
-    regOperPtr->nextOprecInList = RNIL;
-  }//if
-}//Dbtup::initOpConnection()
+  set_tuple_state(regOperPtr, TUPLE_ALREADY_ABORTED);
+  set_trans_state(regOperPtr, TRANS_IDLE);
+  regOperPtr->currentAttrinbufLen= 0;
+  regOperPtr->op_struct.op_type= ZREAD;
+  regOperPtr->op_struct.m_disk_preallocated= 0;
+  regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
+  regOperPtr->op_struct.m_wait_log_buffer= 0;
+  regOperPtr->m_undo_buffer_space= 0;
+}
 
-/* ----------------------------------------------------------------- */
-/* --------------- COMMIT THIS PART OF A TRANSACTION --------------- */
-/* ----------------------------------------------------------------- */
-void Dbtup::execTUP_COMMITREQ(Signal* signal) 
+void
+Dbtup::dealloc_tuple(Signal* signal,
+		     Uint32 gci,
+		     Page* page,
+		     Tuple_header* ptr, 
+		     Operationrec* regOperPtr, 
+		     Fragrecord* regFragPtr, 
+		     Tablerec* regTabPtr)
 {
-  FragrecordPtr regFragPtr;
-  OperationrecPtr regOperPtr;
-  TablerecPtr regTabPtr;
-
-  TupCommitReq * const tupCommitReq = (TupCommitReq *)signal->getDataPtr();
-
-  ljamEntry();
-  regOperPtr.i = tupCommitReq->opPtr;
-  ptrCheckGuard(regOperPtr, cnoOfOprec, operationrec);
-
-  ndbrequire(regOperPtr.p->transstate == STARTED);
-  regOperPtr.p->gci = tupCommitReq->gci;
-  regOperPtr.p->hashValue = tupCommitReq->hashValue;
+  if (ptr->m_header_bits & Tuple_header::DISK_PART)
+  {
+    Local_key disk;
+    memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
+    Ptr<GlobalPage> disk_page;
+    m_global_page_pool.getPtr(disk_page, 
+			      regOperPtr->m_commit_disk_callback_page);
+    disk_page_free(signal, regTabPtr, regFragPtr, 
+		   &disk, *(PagePtr*)&disk_page, gci);
+  }
+}
 
-  regFragPtr.i = regOperPtr.p->fragmentPtr;
-  ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord);
+static
+inline
+bool
+operator>=(const Local_key& key1, const Local_key& key2)
+{
+  return key1.m_page_no > key2.m_page_no ||
+    (key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
+}
 
-  regTabPtr.i = regOperPtr.p->tableRef;
-  ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
+void
+Dbtup::commit_operation(Signal* signal,
+			Uint32 gci,
+			Tuple_header* tuple_ptr, 
+			Page* page,
+			Operationrec* regOperPtr, 
+			Fragrecord* regFragPtr, 
+			Tablerec* regTabPtr)
+{
+  ndbassert(regOperPtr->op_struct.op_type != ZDELETE);
+  
+  Uint32 save= tuple_ptr->m_operation_ptr_i;
+  Uint32 bits= tuple_ptr->m_header_bits;
+
+  Tuple_header *disk_ptr= 0;
+  Tuple_header *copy= (Tuple_header*)
+    c_undo_buffer.get_ptr(&regOperPtr->m_copy_tuple_location);
+  
+  Uint32 copy_bits= copy->m_header_bits;
+
+  Uint32 fix_size= regTabPtr->m_offsets[MM].m_fix_header_size;
+  Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
+  if(mm_vars == 0)
+  {
+    memcpy(tuple_ptr, copy, 4*fix_size);
+    //ndbout_c("commit: memcpy %p %p %d", tuple_ptr, copy, 4*fix_size);
+    disk_ptr= (Tuple_header*)(((Uint32*)copy)+fix_size);
+  }
+  else if(bits & Tuple_header::CHAINED_ROW)
+  {
+    Uint32 *ref= tuple_ptr->get_var_part_ptr(regTabPtr);
+    memcpy(tuple_ptr, copy, 4*(Tuple_header::HeaderSize+fix_size));
 
-  if (!regTabPtr.p->tuxCustomTriggers.isEmpty()) {
-    ljam();
-    executeTuxCommitTriggers(signal,
-                             regOperPtr.p,
-                             regTabPtr.p);
+    Local_key tmp; tmp.assref(*ref);
+    if(0) printf("%p %d %d (%d bytes) - ref: %x ", tuple_ptr,
+	   regOperPtr->m_tuple_location.m_page_no,
+	   regOperPtr->m_tuple_location.m_page_idx,
+	   4*(Tuple_header::HeaderSize+fix_size),
+	   *ref);
+    Ptr<Var_page> vpagePtr;
+    Uint32 *dst= get_ptr(&vpagePtr, *(Var_part_ref*)ref);
+    Uint32 *src= copy->get_var_part_ptr(regTabPtr);
+    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)src)[mm_vars]);
+    ndbassert(4*vpagePtr.p->get_entry_len(tmp.m_page_idx) >= sz);
+    memcpy(dst, src, sz);
+    if(0) printf("ptr: %p %d ref: %x - chain commit", dst, sz, *ref);
+    copy_bits |= Tuple_header::CHAINED_ROW;
+    
+    if(0)
+    {
+      for(Uint32 i = 0; i<((sz+3)>>2); i++)
+	printf(" %.8x", src[i]);
+      printf("\n");
+    }
+    
+    if(copy_bits & Tuple_header::MM_SHRINK)
+    {
+      if(0) printf(" - shrink %d -> %d - ", 
+	     vpagePtr.p->get_entry_len(tmp.m_page_idx), (sz + 3) >> 2);
+      vpagePtr.p->shrink_entry(tmp.m_page_idx, (sz + 3) >> 2);
+      if(0)ndbout_c("%p->shrink_entry(%d, %d)", vpagePtr.p, tmp.m_page_idx, 
+	       (sz + 3) >> 2);
+      update_free_page_list(regFragPtr, vpagePtr.p);
+    } 
+    if(0) ndbout_c("");
+    disk_ptr = (Tuple_header*)
+      (((Uint32*)copy)+Tuple_header::HeaderSize+fix_size+((sz + 3) >> 2));
+  } 
+  else 
+  {
+    Uint32 *var_part= copy->get_var_part_ptr(regTabPtr);
+    Uint32 sz= Tuple_header::HeaderSize + fix_size +
+      ((((mm_vars + 1) << 1) + (((Uint16*)var_part)[mm_vars]) + 3)>> 2);
+    ndbassert(((Var_page*)page)->
+	      get_entry_len(regOperPtr->m_tuple_location.m_page_idx) >= sz);
+    memcpy(tuple_ptr, copy, 4*sz);      
+    if(0) ndbout_c("%p %d %d (%d bytes)", tuple_ptr,
+	     regOperPtr->m_tuple_location.m_page_no,
+	     regOperPtr->m_tuple_location.m_page_idx,
+	     4*sz);
+    if(copy_bits & Tuple_header::MM_SHRINK)
+    {
+      ((Var_page*)page)->shrink_entry(regOperPtr->m_tuple_location.m_page_idx, 
+				      sz);
+      if(0)ndbout_c("%p->shrink_entry(%d, %d)", 
+	       page, regOperPtr->m_tuple_location.m_page_idx, sz);
+      update_free_page_list(regFragPtr, (Var_page*)page);
+    } 
+    disk_ptr = (Tuple_header*)(((Uint32*)copy)+sz);
+  }
+  
+  if (regTabPtr->m_no_of_disk_attributes &&
+      (copy_bits & Tuple_header::DISK_INLINE))
+  {
+    Local_key key;
+    memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+    Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
+    Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
+
+    PagePtr pagePtr = *(PagePtr*)&m_pgman.m_ptr;
+    ndbassert(pagePtr.p->m_page_no == key.m_page_no);
+    ndbassert(pagePtr.p->m_file_no == key.m_file_no);
+    Uint32 sz, *dst;
+    if(copy_bits & Tuple_header::DISK_ALLOC)
+    {
+      disk_page_alloc(signal, regTabPtr, regFragPtr, &key, pagePtr, gci);
+
+      if(lcpScan_ptr_i != RNIL)
+      {
+	ScanOpPtr scanOp;
+	c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
+	Local_key rowid = regOperPtr->m_tuple_location;
+	Local_key scanpos = scanOp.p->m_scanPos.m_key;
+	rowid.m_page_no = pagePtr.p->frag_page_id;
+	if(rowid >= scanpos)
+	{
+	  copy_bits |= Tuple_header::LCP_SKIP;
+	}
+      }
+    }
+    
+    if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+    {
+      sz= regTabPtr->m_offsets[DD].m_fix_header_size;
+      dst= ((Fix_page*)pagePtr.p)->get_ptr(key.m_page_idx, sz);
+    }
+    else
+    {
+      dst= ((Var_page*)pagePtr.p)->get_ptr(key.m_page_idx);
+      sz= ((Var_page*)pagePtr.p)->get_entry_len(key.m_page_idx);
+    }
+    
+    if(! (copy_bits & Tuple_header::DISK_ALLOC))
+    {
+      disk_page_undo_update(pagePtr.p, &key, dst, sz, gci, logfile_group_id);
+    }
+    
+    memcpy(dst, disk_ptr, 4*sz);
+    memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &key, sizeof(Local_key));
+    
+    ndbassert(! (disk_ptr->m_header_bits & Tuple_header::FREE));
+    copy_bits |= Tuple_header::DISK_PART;
   }
 
-  if (regOperPtr.p->tupleState == NO_OTHER_OP) {
-    if ((regOperPtr.p->prevActiveOp == RNIL) &&
-        (regOperPtr.p->nextActiveOp == RNIL)) {
-      ljam();
-/* ---------------------------------------------------------- */
-// We handle the simple case separately as an optimisation
-/* ---------------------------------------------------------- */
-      commitSimple(signal,
-                   regOperPtr.p,
-                   regFragPtr.p,
-                   regTabPtr.p);
-    } else {
-/* ---------------------------------------------------------- */
-// This is the first commit message of this record in this
-// transaction. We will commit this record completely for this
-// transaction. If there are other operations they will be
-// responsible to release their own resources. Also commit of
-// a delete is postponed until the last operation is committed
-// on the tuple.
-//
-// As part of this commitRecord we will also handle detached
-// triggers and release of resources for this operation.
-/* ---------------------------------------------------------- */
-      ljam();
-      commitRecord(signal,
-                   regOperPtr.p,
-                   regFragPtr.p,
-                   regTabPtr.p);
-      removeActiveOpList(regOperPtr.p);
-    }//if
-  } else {
-    ljam();
-/* ---------------------------------------------------------- */
-// Release any copy tuples
-/* ---------------------------------------------------------- */
-    ndbrequire(regOperPtr.p->tupleState == TO_BE_COMMITTED);
-    commitUpdate(signal, regOperPtr.p, regFragPtr.p, regTabPtr.p);
-    removeActiveOpList(regOperPtr.p);
-  }//if
-  initOpConnection(regOperPtr.p, regFragPtr.p);
-}//execTUP_COMMITREQ()
+  
+  Uint32 clear= 
+    Tuple_header::ALLOC |
+    Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE | 
+    Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN;
+  copy_bits &= ~(Uint32)clear;
+  
+  tuple_ptr->m_header_bits= copy_bits;
+  tuple_ptr->m_operation_ptr_i= save;
+  
+  if (regTabPtr->checksumIndicator) {
+    jam();
+    setChecksum(tuple_ptr, regTabPtr);
+  }
+}
 
 void
-Dbtup::updateGcpId(Signal* signal,
-                   Operationrec* const regOperPtr,
-                   Fragrecord* const regFragPtr,
-                   Tablerec* const regTabPtr)
-{
-  PagePtr pagePtr;
-  ljam();
-//--------------------------------------------------------------------
-// Is this code safe for UNDO logging. Not sure currently. RONM
-//--------------------------------------------------------------------
-  pagePtr.i = regOperPtr->realPageId;
-  ptrCheckGuard(pagePtr, cnoOfPage, page);
-  Uint32 temp = regOperPtr->pageOffset + regTabPtr->tupGCPIndex;
-  ndbrequire((temp < ZWORDS_ON_PAGE) &&
-             (regTabPtr->tupGCPIndex < regTabPtr->tupheadsize));
-  if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageId)) {
-    Uint32 prevGCI = pagePtr.p->pageWord[temp];
-    ljam();
-    cprAddUndoLogRecord(signal,
-                        ZLCPR_TYPE_UPDATE_GCI,
-                        regOperPtr->fragPageId,
-                        regOperPtr->pageIndex,
-                        regOperPtr->tableRef,
-                        regOperPtr->fragId,
-                        regFragPtr->checkpointVersion);
-    cprAddGCIUpdate(signal,
-                    prevGCI,
-                    regFragPtr);
-  }//if
-  pagePtr.p->pageWord[temp] = regOperPtr->gci;
-  if (regTabPtr->checksumIndicator) {
-    ljam();
-    setChecksum(pagePtr.p, regOperPtr->pageOffset, regTabPtr->tupheadsize);
-  }//if
-}//Dbtup::updateGcpId()
+Dbtup::disk_page_commit_callback(Signal* signal, 
+				 Uint32 opPtrI, Uint32 page_id)
+{
+  Uint32 hash_value;
+  Uint32 gci;
+  OperationrecPtr regOperPtr;
+
+  ljamEntry();
+  
+  c_operation_pool.getPtr(regOperPtr, opPtrI);
+  c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci);
+
+  TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
+  
+  tupCommitReq->opPtr= opPtrI;
+  tupCommitReq->hashValue= hash_value;
+  tupCommitReq->gci= gci;
+
+  regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
+  regOperPtr.p->m_commit_disk_callback_page= page_id;
+  m_global_page_pool.getPtr(m_pgman.m_ptr, page_id);
+  
+  execTUP_COMMITREQ(signal);
+  if(signal->theData[0] == 0)
+    c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
+}
 
 void
-Dbtup::commitRecord(Signal* signal,
-                    Operationrec* const regOperPtr,
-                    Fragrecord* const regFragPtr,
-                    Tablerec* const regTabPtr)
+Dbtup::disk_page_log_buffer_callback(Signal* signal, 
+				     Uint32 opPtrI,
+				     Uint32 unused)
 {
-  Uint32 opType;
-  OperationrecPtr firstOpPtr;
-  PagePtr pagePtr;
+  Uint32 hash_value;
+  Uint32 gci;
+  OperationrecPtr regOperPtr;
 
-  pagePtr.i = regOperPtr->realPageId;
-  ptrCheckGuard(pagePtr, cnoOfPage, page);
+  ljamEntry();
+  
+  c_operation_pool.getPtr(regOperPtr, opPtrI);
+  c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci);
+
+  TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
+  
+  tupCommitReq->opPtr= opPtrI;
+  tupCommitReq->hashValue= hash_value;
+  tupCommitReq->gci= gci;
+
+  Uint32 page= regOperPtr.p->m_commit_disk_callback_page;
+  ndbassert(regOperPtr.p->op_struct.m_load_diskpage_on_commit == 0);
+  regOperPtr.p->op_struct.m_wait_log_buffer= 0;
+  m_global_page_pool.getPtr(m_pgman.m_ptr, page);
+  
+  execTUP_COMMITREQ(signal);
+  ndbassert(signal->theData[0] == 0);
+  
+  c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
+}
 
-  setTupleStatesSetOpType(regOperPtr, pagePtr.p, opType, firstOpPtr);
+void
+Dbtup::fix_commit_order(OperationrecPtr opPtr)
+{
+  ndbassert(!opPtr.p->is_first_operation());
+  OperationrecPtr firstPtr = opPtr;
+  while(firstPtr.p->prevActiveOp != RNIL)
+  {
+    firstPtr.i = firstPtr.p->prevActiveOp;
+    c_operation_pool.getPtr(firstPtr);    
+  }
 
-  fragptr.p = regFragPtr;
-  tabptr.p = regTabPtr;
+  ndbout_c("fix_commit_order (swapping %d and %d)",
+	   opPtr.i, firstPtr.i);
+  
+  /**
+   * Swap data between first and curr
+   */
+  Uint32 prev= opPtr.p->prevActiveOp;
+  Uint32 next= opPtr.p->nextActiveOp;
+  Uint32 seco= firstPtr.p->nextActiveOp;
+
+  Operationrec tmp = *opPtr.p;
+  * opPtr.p = * firstPtr.p;
+  * firstPtr.p = tmp;
+
+  c_operation_pool.getPtr(seco)->prevActiveOp = opPtr.i;
+  c_operation_pool.getPtr(prev)->nextActiveOp = firstPtr.i;
+  if(next != RNIL)
+    c_operation_pool.getPtr(next)->prevActiveOp = firstPtr.i;
+}
 
-  if (opType == ZINSERT_DELETE) {
-    ljam();
-//--------------------------------------------------------------------
-// We started by inserting the tuple and ended by deleting. Seen from
-// transactions point of view no changes were made.
-//--------------------------------------------------------------------
-    commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
-    return;
-  } else if (opType == ZINSERT) {
-    ljam();
-//--------------------------------------------------------------------
-// We started by inserting whereafter we made several changes to the
-// tuple that could include updates, deletes and new inserts. The final
-// state of the tuple is the original tuple. This is reached from this
-// operation. We change the optype on this operation to ZINSERT to
-// ensure proper operation of the detached trigger.
-// We restore the optype after executing triggers although not really
-// needed.
-//--------------------------------------------------------------------
-    Uint32 saveOpType = regOperPtr->optype;
-    regOperPtr->optype = ZINSERT;
-    operPtr.p = regOperPtr;
-
-    checkDetachedTriggers(signal,
-                          regOperPtr,
-                          regTabPtr);
+/* ----------------------------------------------------------------- */
+/* --------------- COMMIT THIS PART OF A TRANSACTION --------------- */
+/* ----------------------------------------------------------------- */
+void Dbtup::execTUP_COMMITREQ(Signal* signal) 
+{
+  FragrecordPtr regFragPtr;
+  OperationrecPtr regOperPtr;
+  TablerecPtr regTabPtr;
+  KeyReqStruct req_struct;
+  TransState trans_state;
+  Uint32 no_of_fragrec, no_of_tablerec, hash_value, gci;
 
-    regOperPtr->optype = saveOpType;
-  } else if (opType == ZUPDATE) {
-    ljam();
-//--------------------------------------------------------------------
-// We want to use the first operation which contains a copy tuple
-// reference. This operation contains the before value of this record
-// for this transaction. Then this operation is used for executing
-// triggers with optype set to update.
-//--------------------------------------------------------------------
-    OperationrecPtr befOpPtr;
-    findBeforeValueOperation(befOpPtr, firstOpPtr);
-
-    Uint32 saveOpType = befOpPtr.p->optype;
-    Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
-    Bitmask<MAXNROFATTRIBUTESINWORDS> saveAttributeMask;
-
-    calculateChangeMask(pagePtr.p,
-                        regTabPtr,
-                        befOpPtr.p->pageOffset,
-                        attributeMask);
-
-    saveAttributeMask.clear();
-    saveAttributeMask.bitOR(befOpPtr.p->changeMask);
-    befOpPtr.p->changeMask.clear();
-    befOpPtr.p->changeMask.bitOR(attributeMask);
-    befOpPtr.p->gci = regOperPtr->gci;
-    
-    befOpPtr.p->optype = opType;
-    operPtr.p = befOpPtr.p;
-    checkDetachedTriggers(signal,
-                          befOpPtr.p,
-                          regTabPtr);
+  TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
 
-    befOpPtr.p->changeMask.clear();
-    befOpPtr.p->changeMask.bitOR(saveAttributeMask);
+  regOperPtr.i= tupCommitReq->opPtr;
+  ljamEntry();
 
-    befOpPtr.p->optype = saveOpType;
-  } else if (opType == ZDELETE) {
-    ljam();
-//--------------------------------------------------------------------
-// We want to use the first operation which contains a copy tuple.
-// We benefit from the fact that we know that it cannot be a simple
-// delete and it cannot be an insert followed by a delete. Thus there
-// must either be an update or a insert following a delete. In both
-// cases we will find a before value in a copy tuple.
-//
-// An added complexity is that the trigger handling assumes that the
-// before value is located in the original tuple so we have to move the
-// copy tuple reference to the original tuple reference and afterwards
-// restore it again.
-//--------------------------------------------------------------------
-    OperationrecPtr befOpPtr;
-    findBeforeValueOperation(befOpPtr, firstOpPtr);
-    Uint32 saveOpType = befOpPtr.p->optype;
-
-    Uint32 realPageId = befOpPtr.p->realPageId;
-    Uint32 pageOffset = befOpPtr.p->pageOffset;
-    Uint32 fragPageId = befOpPtr.p->fragPageId;
-    Uint32 pageIndex  = befOpPtr.p->pageIndex;
-
-    befOpPtr.p->realPageId = befOpPtr.p->realPageIdC;
-    befOpPtr.p->pageOffset = befOpPtr.p->pageOffsetC;
-    befOpPtr.p->fragPageId = befOpPtr.p->fragPageIdC;
-    befOpPtr.p->pageIndex  = befOpPtr.p->pageIndexC;
-    befOpPtr.p->gci = regOperPtr->gci;
-
-    befOpPtr.p->optype = opType;
-    operPtr.p = befOpPtr.p;
-    checkDetachedTriggers(signal,
-                          befOpPtr.p,
-                          regTabPtr);
-
-    befOpPtr.p->realPageId = realPageId;
-    befOpPtr.p->pageOffset = pageOffset;
-    befOpPtr.p->fragPageId = fragPageId;
-    befOpPtr.p->pageIndex  = pageIndex;
-    befOpPtr.p->optype     = saveOpType;
-  } else {
-    ndbrequire(false);
-  }//if
+  c_operation_pool.getPtr(regOperPtr);
+  if(!regOperPtr.p->is_first_operation())
+  {
+    /**
+     * Out of order commit   XXX check effect on triggers
+     */
+    fix_commit_order(regOperPtr);
+  }
+  ndbassert(regOperPtr.p->is_first_operation());
+  
+  regFragPtr.i= regOperPtr.p->fragmentPtr;
+  trans_state= get_trans_state(regOperPtr.p);
+
+  no_of_fragrec= cnoOfFragrec;
+
+  ndbrequire(trans_state == TRANS_STARTED);
+  ptrCheckGuard(regFragPtr, no_of_fragrec, fragrecord);
+
+  no_of_tablerec= cnoOfTablerec;
+  regTabPtr.i= regFragPtr.p->fragTableId;
+  hash_value= tupCommitReq->hashValue;
+  gci= tupCommitReq->gci;
+
+  req_struct.signal= signal;
+  req_struct.hash_value= hash_value;
+  req_struct.gci= gci;
+
+  ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec);
+
+  PagePtr page;
+  Tuple_header* tuple_ptr= 0;
+  if(regOperPtr.p->op_struct.m_load_diskpage_on_commit)
+  {
+    ndbassert(regOperPtr.p->is_first_operation() && 
+	      regOperPtr.p->is_last_operation());
 
-  commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
-  if (regTabPtr->GCPIndicator) {
-    updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
-  }//if
-}//Dbtup::commitRecord()
+    Page_cache_client::Request req;
+    /**
+     * Check for page
+     */
+    if(!regOperPtr.p->m_copy_tuple_location.isNull())
+    {
+      Tuple_header* tmp= (Tuple_header*)
+	c_undo_buffer.get_ptr(&regOperPtr.p->m_copy_tuple_location);
+      
+      memcpy(&req.m_page, 
+	     tmp->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+    } 
+    else
+    {
+      // initial delete
+      ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE);
+      tuple_ptr= (Tuple_header*)
+	get_ptr(&page, &regOperPtr.p->m_tuple_location, regTabPtr.p);
+      memcpy(&req.m_page, 
+	     tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
+    }
+    req.m_callback.m_callbackData= regOperPtr.i;
+    req.m_callback.m_callbackFunction = 
+      safe_cast(&Dbtup::disk_page_commit_callback);
+
+    int flags= regOperPtr.p->op_struct.op_type |
+      Page_cache_client::COMMIT_REQ | Page_cache_client::STRICT_ORDER;
+    int res= m_pgman.get_page(signal, req, flags);
+    switch(res){
+    case 0:
+      /**
+       * Timeslice
+       */
+      signal->theData[0] = 1;
+      return;
+    case -1:
+      ndbrequire("NOT YET IMPLEMENTED" == 0);
+      break;
+    }
+    regOperPtr.p->m_commit_disk_callback_page= res;
+    regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
+  } 
+  
+  if(regOperPtr.p->op_struct.m_wait_log_buffer)
+  {
+    ndbassert(regOperPtr.p->is_first_operation() && 
+	      regOperPtr.p->is_last_operation());
+    
+    Callback cb;
+    cb.m_callbackData= regOperPtr.i;
+    cb.m_callbackFunction = 
+      safe_cast(&Dbtup::disk_page_log_buffer_callback);
+    Uint32 sz= regOperPtr.p->m_undo_buffer_space;
+    
+    Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
+    int res= lgman.get_log_buffer(signal, sz, &cb);
+    switch(res){
+    case 0:
+      signal->theData[0] = 1;
+      return;
+    case -1:
+      ndbrequire("NOT YET IMPLEMENTED" == 0);
+      break;
+    }
+  }
+  
+  if(!tuple_ptr)
+  {
+    req_struct.m_tuple_ptr= tuple_ptr = (Tuple_header*)
+      get_ptr(&page, &regOperPtr.p->m_tuple_location,regTabPtr.p);
+  }
+  
+  if(get_tuple_state(regOperPtr.p) == TUPLE_PREPARED)
+  {
+    /**
+     * Execute all tux triggers at first commit
+     *   since previous tuple is otherwise removed...
+     *   btw...is this a "good" solution??
+     *   
+     *   why can't we instead remove "own version" (when approriate ofcourse)
+     */
+    if (!regTabPtr.p->tuxCustomTriggers.isEmpty()) {
+      ljam();
+      OperationrecPtr loopPtr= regOperPtr;
+      while(loopPtr.i != RNIL)
+      {
+	c_operation_pool.getPtr(loopPtr);
+	executeTuxCommitTriggers(signal,
+				 loopPtr.p,
+				 regFragPtr.p,
+				 regTabPtr.p);
+	set_tuple_state(loopPtr.p, TUPLE_TO_BE_COMMITTED);
+	loopPtr.i = loopPtr.p->nextActiveOp;
+      }
+    }
+  }
+  
+  if(regOperPtr.p->is_last_operation())
+  {
+    /**
+     * Perform "real" commit
+     */
+    set_change_mask_info(&req_struct, regOperPtr.p);
+    checkDetachedTriggers(&req_struct, regOperPtr.p, regTabPtr.p);
+    
+    if(regOperPtr.p->op_struct.op_type != ZDELETE)
+    {
+      commit_operation(signal, gci, tuple_ptr, page.p,
+		       regOperPtr.p, regFragPtr.p, regTabPtr.p); 
+      removeActiveOpList(regOperPtr.p, tuple_ptr);
+    }
+    else
+    {
+      removeActiveOpList(regOperPtr.p, tuple_ptr);
+      dealloc_tuple(signal, gci, page.p, tuple_ptr, 
+		    regOperPtr.p, regFragPtr.p, regTabPtr.p); 
+    }
+  } 
+  else
+  {
+    removeActiveOpList(regOperPtr.p, tuple_ptr);   
+  }
+  
+  initOpConnection(regOperPtr.p);
+  signal->theData[0] = 0;
+}
 
 void
-Dbtup::setTupleStatesSetOpType(Operationrec* const regOperPtr,
-                               Page* const pagePtr,
-                               Uint32& opType,
-                               OperationrecPtr& firstOpPtr)
+Dbtup::set_change_mask_info(KeyReqStruct * const req_struct,
+                            Operationrec * const regOperPtr)
 {
-  OperationrecPtr loopOpPtr;
-  OperationrecPtr lastOpPtr;
-
-  ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
-  loopOpPtr.i = pagePtr->pageWord[regOperPtr->pageOffset];
-  ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-  lastOpPtr = loopOpPtr;
-  if (loopOpPtr.p->optype == ZDELETE) {
-    ljam();
-    opType = ZDELETE;
-  } else {
-    ljam();
-    opType = ZUPDATE;
-  }//if
-  do {
-    ljam();
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-    firstOpPtr = loopOpPtr;
-    loopOpPtr.p->tupleState = TO_BE_COMMITTED;
-    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
-  } while (loopOpPtr.i != RNIL);
-  if (opType == ZDELETE) {
-    ljam();
-    if (firstOpPtr.p->optype == ZINSERT) {
-      ljam();
-      opType = ZINSERT_DELETE;
-    }//if
-  } else {
+  ChangeMaskState change_mask= get_change_mask_state(regOperPtr);
+  if (change_mask == USE_SAVED_CHANGE_MASK) {
     ljam();
-    if (firstOpPtr.p->optype == ZINSERT) {
-      ljam();
-      opType = ZINSERT;
-    }//if
-  }///if
-}//Dbtup::setTupleStatesSetOpType()
-
-void Dbtup::findBeforeValueOperation(OperationrecPtr& befOpPtr,
-                                     OperationrecPtr firstOpPtr)
-{
-  befOpPtr = firstOpPtr;
-  if (befOpPtr.p->realPageIdC != RNIL) {
+    req_struct->changeMask.setWord(0, regOperPtr->saved_change_mask[0]);
+    req_struct->changeMask.setWord(1, regOperPtr->saved_change_mask[1]);
+    //get saved state
+  } else if (change_mask == RECALCULATE_CHANGE_MASK) {
+    ljam();
+    //Recompute change mask, for now set all bits
+    req_struct->changeMask.set();
+  } else if (change_mask == SET_ALL_MASK) {
     ljam();
-    return;
+    req_struct->changeMask.set();
   } else {
     ljam();
-    befOpPtr.i = befOpPtr.p->prevActiveOp;
-    ptrCheckGuard(befOpPtr, cnoOfOprec, operationrec);
-    ndbrequire(befOpPtr.p->realPageIdC != RNIL);
-  }//if
-}//Dbtup::findBeforeValueOperation()
+    ndbrequire(change_mask == DELETE_CHANGES);
+  }
+}
 
 void
 Dbtup::calculateChangeMask(Page* const pagePtr,
                            Tablerec* const regTabPtr,
-                           Uint32 pageOffset,
-                           Bitmask<MAXNROFATTRIBUTESINWORDS>& attributeMask)
+                           KeyReqStruct * const req_struct)
 {
   OperationrecPtr loopOpPtr;
-
-  attributeMask.clear();
-  ndbrequire(pageOffset < ZWORDS_ON_PAGE);
-  loopOpPtr.i = pagePtr->pageWord[pageOffset];
+  Uint32 saved_word1= 0;
+  Uint32 saved_word2= 0;
+  loopOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
   do {
-    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
-    if (loopOpPtr.p->optype == ZUPDATE) {
+    c_operation_pool.getPtr(loopOpPtr);
+    ndbrequire(loopOpPtr.p->op_struct.op_type == ZUPDATE);
+    ChangeMaskState change_mask= get_change_mask_state(loopOpPtr.p);
+    if (change_mask == USE_SAVED_CHANGE_MASK) {
+      ljam();
+      saved_word1|= loopOpPtr.p->saved_change_mask[0];
+      saved_word2|= loopOpPtr.p->saved_change_mask[1];
+    } else if (change_mask == RECALCULATE_CHANGE_MASK) {
       ljam();
-      attributeMask.bitOR(loopOpPtr.p->changeMask);
-    } else if (loopOpPtr.p->optype == ZINSERT) {
-      ljam();
-      attributeMask.set();
+      //Recompute change mask, for now set all bits
+      req_struct->changeMask.set();
       return;
     } else {
-      ndbrequire(loopOpPtr.p->optype == ZDELETE);
-    }//if
-    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
+      ndbrequire(change_mask == SET_ALL_MASK);
+      ljam();
+      req_struct->changeMask.set();
+      return;
+    }
+    loopOpPtr.i= loopOpPtr.p->prevActiveOp;
   } while (loopOpPtr.i != RNIL);
-}//Dbtup::calculateChangeMask()
+  req_struct->changeMask.setWord(0, saved_word1);
+  req_struct->changeMask.setWord(1, saved_word2);
+}
Thread
bk commit into 5.1 tree (konstantin:1.2035)konstantin4 Jan