List:Commits« Previous MessageNext Message »
From:Stewart Smith Date:October 13 2008 8:55am
Subject:bzr push into mysql-5.1 branch (stewart:3000 to 3001)
View as plain text  
 3001 Stewart Smith	2008-10-13 [merge]
      merge mainline 6.4
modified:
  configure.in
  mysql-test/mysql-test-run.pl
  mysql-test/ndb/ndbcluster.sh
  mysql-test/suite/ndb/r/ndb_dd_alter.result
  sql/ha_ndbcluster.cc
  sql/ha_ndbcluster_binlog.cc
  sql/ha_ndbcluster_lock_ext.h
  storage/ndb/src/common/transporter/TCP_Transporter.cpp
  storage/ndb/src/common/transporter/Transporter.cpp
  storage/ndb/src/common/transporter/Transporter.hpp
  storage/ndb/src/common/util/ndb_opts.c
  storage/ndb/src/kernel/blocks/backup/Backup.cpp
  storage/ndb/src/kernel/blocks/backup/Backup.hpp
  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
  storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
  storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
  storage/ndb/src/kernel/blocks/suma/Suma.cpp
  storage/ndb/src/kernel/blocks/suma/Suma.hpp
  storage/ndb/src/kernel/blocks/suma/SumaInit.cpp
  storage/ndb/src/kernel/vm/VMSignal.hpp
  storage/ndb/src/kernel/vm/mt-asm.h
  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
  storage/ndb/test/src/NDBT_Tables.cpp

 3000 Stewart Smith	2008-10-13
       display complete set of FSOPENREQ flags in signal log (and in correct order)
modified:
  storage/ndb/src/common/debugger/signaldata/FsOpenReq.cpp

=== modified file 'configure.in'
--- a/configure.in	2008-10-08 20:24:20 +0000
+++ b/configure.in	2008-10-11 09:22:33 +0000
@@ -2061,7 +2061,7 @@ AC_CHECK_FUNCS(alarm bcmp bfill bmove bs
   snprintf socket stpcpy strcasecmp strerror strsignal strnlen strpbrk strstr \
   strtol strtoll strtoul strtoull tell tempnam thr_setconcurrency vidattr \
   backtrace backtrace_symbols backtrace_symbols_fd \
-  posix_fallocate posix_memalign memalign)
+  posix_fallocate posix_memalign memalign sysconf)
 
 #
 #

=== modified file 'mysql-test/mysql-test-run.pl'
--- a/mysql-test/mysql-test-run.pl	2008-10-03 12:04:01 +0000
+++ b/mysql-test/mysql-test-run.pl	2008-10-09 20:11:31 +0000
@@ -2842,7 +2842,7 @@ sub ndbd_start ($$$) {
   mtr_add_arg($args, "$extra_args");
 
   my $nodeid= $cluster->{'ndbds'}->[$idx]->{'nodeid'};
-  my $path_ndbd_log= "$cluster->{'data_dir'}/ndb_${nodeid}.log";
+  my $path_ndbd_log= "$cluster->{'data_dir'}/ndb_${nodeid}_out.log";
 
   my $exe= $exe_ndbd;
   if ($exe_ndbmtd and ($exe_ndbmtd_counter++ % 2) == 0)

=== modified file 'mysql-test/ndb/ndbcluster.sh'
--- a/mysql-test/ndb/ndbcluster.sh	2007-03-21 07:02:11 +0000
+++ b/mysql-test/ndb/ndbcluster.sh	2008-10-09 20:04:10 +0000
@@ -31,7 +31,7 @@ cd $CWD
 if [ -d ../sql ] ; then
    SOURCE_DIST=1
    ndbtop=$BASEDIR/storage/ndb
-   exec_ndb=$ndbtop/src/kernel/ndbd
+   exec_ndb=$ndbtop/src/kernel
    exec_mgmtsrvr=$ndbtop/src/mgmsrv/ndb_mgmd
    exec_waiter=$ndbtop/tools/ndb_waiter
    exec_test=$ndbtop/tools/ndb_test_platform
@@ -41,10 +41,10 @@ else
    BINARY_DIST=1
    if test -x "$BASEDIR/libexec/ndbd"
    then
-     exec_ndb=$BASEDIR/libexec/ndbd
+     exec_ndb=$BASEDIR/libexec
      exec_mgmtsrvr=$BASEDIR/libexec/ndb_mgmd
    else
-     exec_ndb=$BASEDIR/bin/ndbd
+     exec_ndb=$BASEDIR/bin
      exec_mgmtsrvr=$BASEDIR/bin/ndb_mgmd
    fi
    exec_waiter=$BASEDIR/bin/ndb_waiter
@@ -75,6 +75,7 @@ ndb_diskless=0
 ndbd_nodes=2
 relative_config_data_dir=
 opt_core=
+opt_exec_ndb=ndbd
 
 ndb_no_ord=512
 ndb_no_attr=2048
@@ -82,6 +83,7 @@ ndb_con_op=105000
 ndb_dmem=80M
 ndb_imem=24M
 ndb_pbmem=32M
+ndb_threads=4
 
 VERBOSE=100
 NDB_MGM_EXTRA_OPTS=
@@ -144,6 +146,13 @@ while test $# -gt 0; do
     --character-sets-dir=*)
      CHARSETSDIR=`echo "$1" | sed -e "s;--character-sets-dir=;;"`
      ;;
+    --mt=*)
+     ndb_threads=`echo "$1" | sed -e "s;--mt=;;"`
+     opt_exec_ndb=ndbmtd
+     ;;
+    --mt)
+     opt_exec_ndb=ndbmtd
+     ;;
     --core)
      opt_core="--core"
      ;;
@@ -159,6 +168,7 @@ done
 
 fs_ndb="$fsdir/ndbcluster-$port"
 config_ini=ndb/ndb_config_${ndbd_nodes}_node.ini
+exec_ndb=$exec_ndb/$opt_exec_ndb
 
 NDB_HOME=
 if [ ! -d "$fsdir" ]; then
@@ -191,6 +201,12 @@ ndb_host="localhost"
 ndb_mgmd_port=$port
 NDB_CONNECTSTRING="host=$ndb_host:$ndb_mgmd_port"
 export NDB_CONNECTSTRING
+NDB_MT_LQH=1
+export NDB_MT_LQH
+NDBMT_LQH_THREADS=$ndb_threads
+export NDBMT_LQH_THREADS
+NDBMT_LQH_WORKERS=$ndb_threads
+export NDBMT_LQH_WORKERS
 
 sleep_until_file_created () {
   file=$1

=== modified file 'mysql-test/suite/ndb/r/ndb_dd_alter.result'
--- a/mysql-test/suite/ndb/r/ndb_dd_alter.result	2007-11-19 12:56:54 +0000
+++ b/mysql-test/suite/ndb/r/ndb_dd_alter.result	2008-10-10 09:32:12 +0000
@@ -85,7 +85,7 @@ a5 Decimal(5,1) NULL AT=FIXED ST=DISK
 a6 Time NULL AT=FIXED ST=DISK
 a7 Date NULL AT=FIXED ST=DISK
 a8 Datetime NULL AT=FIXED ST=DISK
-a9 Varchar(255;latin1_swedish_ci) NULL AT=FIXED ST=DISK
+a9 Varchar(255;latin1_swedish_ci) NULL AT=SHORT_VAR ST=DISK
 a10 Blob(256,2000,0) NULL AT=MEDIUM_VAR ST=DISK BV=2
 SELECT * FROM test.t1 ORDER BY a1;
 a1	a2	a3	a4	a5	a6	a7	a8	a9	a10
@@ -184,7 +184,7 @@ a5 Decimal(5,1) NULL AT=FIXED ST=DISK
 a6 Time NULL AT=FIXED ST=DISK
 a7 Date NULL AT=FIXED ST=DISK
 a8 Datetime NULL AT=FIXED ST=DISK
-a9 Varchar(255;latin1_swedish_ci) NULL AT=FIXED ST=DISK
+a9 Varchar(255;latin1_swedish_ci) NULL AT=SHORT_VAR ST=DISK
 a10 Blob(256,2000,0) NULL AT=MEDIUM_VAR ST=DISK BV=2
 SELECT * FROM test.t1 ORDER BY a1;
 a1	a2	a3	a4	a5	a6	a7	a8	a9	a10
@@ -270,7 +270,7 @@ a5 Decimal(5,1) NULL AT=FIXED ST=DISK
 a6 Time NULL AT=FIXED ST=DISK
 a7 Date NULL AT=FIXED ST=DISK
 a8 Datetime NULL AT=FIXED ST=DISK
-a9 Varchar(255;latin1_swedish_ci) NULL AT=FIXED ST=DISK
+a9 Varchar(255;latin1_swedish_ci) NULL AT=SHORT_VAR ST=DISK
 a10 Blob(256,2000,0) NULL AT=MEDIUM_VAR ST=DISK BV=2
 ALTER TABLE test.t1 ENGINE=MyISAM;
 SHOW CREATE TABLE test.t1;

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2008-10-09 16:42:40 +0000
+++ b/sql/ha_ndbcluster.cc	2008-10-13 08:26:39 +0000
@@ -11313,6 +11313,14 @@ int ha_ndbcluster::get_default_no_partit
   uint no_fragments=
     get_no_fragments(max_rows >= min_rows ? max_rows : min_rows);
   uint no_nodes= g_ndb_cluster_connection->no_db_nodes();
+
+  if (getenv("NDBMT_LQH_THREADS"))
+  {
+    int mul= atoi(getenv("NDBMT_LQH_THREADS"));
+    if (mul <= 0)
+      mul= 1;
+    no_nodes*= (uint)mul;
+  }
   if (adjusted_frag_count(no_fragments, no_nodes, reported_frags))
   {
     push_warning(current_thd,

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2008-10-08 05:48:19 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2008-10-09 09:33:43 +0000
@@ -27,6 +27,7 @@
 #include <ndbapi/NdbDictionary.hpp>
 #include <ndbapi/ndb_cluster_connection.hpp>
 #include <util/NdbAutoPtr.hpp>
+#include <portlib/NdbTick.h>
 
 #ifdef ndb_dynamite
 #undef assert
@@ -2270,10 +2271,11 @@ ndb_binlog_thread_handle_schema_event(TH
         case SOT_RENAME_TABLE_NEW:
         {
           uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
-                             "NDB Binlog: Skipping renaming locally defined table '%s.%s' from binlog schema event '%s' from node %d. ",
-                             schema->db, schema->name, schema->query,
-                             schema->node_id);
-          
+                                "NDB Binlog: Skipping renaming locally "
+                                "defined table '%s.%s' from binlog schema "
+                                "event '%s' from node %d. ",
+                                schema->db, schema->name, schema->query,
+                                schema->node_id);
           errmsg[end]= '\0';
         }
         // fall through
@@ -2281,9 +2283,11 @@ ndb_binlog_thread_handle_schema_event(TH
           if (schema_type == SOT_DROP_TABLE)
           {
             uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
-                               "NDB Binlog: Skipping dropping locally defined table '%s.%s' from binlog schema event '%s' from node %d. ",
-                               schema->db, schema->name, schema->query,
-                               schema->node_id);
+                                  "NDB Binlog: Skipping dropping locally "
+                                  "defined table '%s.%s' from binlog schema "
+                                  "event '%s' from node %d. ",
+                                  schema->db, schema->name, schema->query,
+                                  schema->node_id);
             errmsg[end]= '\0';
           }
           if (! ndbcluster_check_if_local_table(schema->db, schema->name))
@@ -3581,7 +3585,7 @@ ndbcluster_read_binlog_replication(THD *
       dict->getNdbError().classification == NdbError::SchemaError)
   {
     DBUG_PRINT("info", ("No %s.%s table", ndb_rep_db, ndb_replication_table));
-    if (!table)
+    if (do_set_binlog_flags)
       set_binlog_flags(share, NBT_DEFAULT);
     DBUG_RETURN(0);
   }
@@ -6152,16 +6156,16 @@ ndbcluster_show_status_binlog(THD* thd, 
 
     buflen=
       my_snprintf(buf, sizeof(buf),
-               "latest_epoch=%s, "
-               "latest_trans_epoch=%s, "
-               "latest_received_binlog_epoch=%s, "
-               "latest_handled_binlog_epoch=%s, "
-               "latest_applied_binlog_epoch=%s",
-               llstr(ndb_latest_epoch, buff1),
-               llstr(ndb_get_latest_trans_gci(), buff2),
-               llstr(ndb_latest_received_binlog_epoch, buff3),
-               llstr(ndb_latest_handled_binlog_epoch, buff4),
-               llstr(ndb_latest_applied_binlog_epoch, buff5));
+                  "latest_epoch=%s, "
+                  "latest_trans_epoch=%s, "
+                  "latest_received_binlog_epoch=%s, "
+                  "latest_handled_binlog_epoch=%s, "
+                  "latest_applied_binlog_epoch=%s",
+                  llstr(ndb_latest_epoch, buff1),
+                  llstr(ndb_get_latest_trans_gci(), buff2),
+                  llstr(ndb_latest_received_binlog_epoch, buff3),
+                  llstr(ndb_latest_handled_binlog_epoch, buff4),
+                  llstr(ndb_latest_applied_binlog_epoch, buff5));
     if (stat_print(thd, ndbcluster_hton_name, ndbcluster_hton_name_length,
                    "binlog", strlen("binlog"),
                    buf, buflen))

=== modified file 'sql/ha_ndbcluster_lock_ext.h'
--- a/sql/ha_ndbcluster_lock_ext.h	2008-10-09 08:21:38 +0000
+++ b/sql/ha_ndbcluster_lock_ext.h	2008-10-13 08:26:39 +0000
@@ -43,7 +43,7 @@ ndbcluster_global_schema_lock_ext(THD *t
   if (retry_time > 0)
   {
     time_end= NdbTick_CurrentMillisecond();
-    time_end+= retry_time*1000;
+    time_end+= retry_time * 1000;
   }
   while (1)
   {
@@ -79,11 +79,9 @@ ndbcluster_global_schema_lock_ext(THD *t
   retry:
     if (retry_time == 0)
       goto error_handler;
-    if (retry_time > 0)
-    {
-	  if(time_end < NdbTick_CurrentMillisecond())
-	    goto error_handler;
-    }
+    if (retry_time > 0 &&
+        time_end < NdbTick_CurrentMillisecond())
+      goto error_handler;
     if (trans)
     {
       ndb->closeTransaction(trans);

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2008-08-28 12:23:34 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2008-10-11 09:22:33 +0000
@@ -255,7 +255,8 @@ TCP_Transporter::doSend() {
   if (used == 0)
     return true;                                // Nothing to send
 
-  int nBytesSent = my_socket_writev(theSocket, m_send_iovec, used);
+  Uint32 iovcnt = used > m_os_max_iovec ? m_os_max_iovec : used;
+  int nBytesSent = my_socket_writev(theSocket, m_send_iovec, iovcnt);
 
   if (nBytesSent > 0)
   {

=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp	2008-08-21 06:32:09 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp	2008-10-11 13:45:50 +0000
@@ -89,6 +89,16 @@ Transporter::Transporter(TransporterRegi
 
     m_socket_client->set_connect_timeout((m_timeOutMillis+999)/1000);
   }
+
+  m_os_max_iovec = 16;
+#if defined (_SC_IOV_MAX) && defined (HAVE_SYSCONF)
+  long res = sysconf(_SC_IOV_MAX);
+  if (res != (long)-1)
+  {
+    m_os_max_iovec = (Uint32)res;
+  }
+#endif
+  
   DBUG_VOID_RETURN;
 }
 

=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp	2008-08-21 06:32:09 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp	2008-10-11 09:22:33 +0000
@@ -172,6 +172,7 @@ private:
 
 protected:
   static const Uint32 SEND_IOVEC_SIZE = 64;
+  Uint32 m_os_max_iovec;
   Uint32 m_send_iovec_used;
   struct iovec m_send_iovec[SEND_IOVEC_SIZE];
 

=== modified file 'storage/ndb/src/common/util/ndb_opts.c'
--- a/storage/ndb/src/common/util/ndb_opts.c	2008-10-01 07:13:17 +0000
+++ b/storage/ndb/src/common/util/ndb_opts.c	2008-10-09 20:11:31 +0000
@@ -118,6 +118,11 @@ ndb_std_get_one_option(int optid,
 
 void ndb_std_print_version()
 {
-  printf("MySQL distrib %s, for %s (%s)\n",
-         MYSQL_SERVER_VERSION,SYSTEM_TYPE,MACHINE_TYPE);
+#ifndef DBUG_OFF
+  const char *suffix= "-debug";
+#else
+  const char *suffix= "";
+#endif
+  printf("MySQL distrib %s%s, for %s (%s)\n",
+         NDB_VERSION_STRING,suffix,SYSTEM_TYPE,MACHINE_TYPE);
 }

=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2008-10-08 13:56:11 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2008-10-09 19:17:11 +0000
@@ -4632,6 +4632,66 @@ Backup::execBACKUP_TRIG_REQ(Signal* sign
   signal->theData[0] = result;
 }
 
+BackupFormat::LogFile::LogEntry *
+Backup::get_log_buffer(Signal* signal,
+                       TriggerPtr trigPtr, Uint32 sz)
+{
+  Uint32 * dst;
+  if(ERROR_INSERTED(10030))
+  {
+    jam();
+    dst = 0;
+  }
+  else
+  {
+    jam();
+    FsBuffer & buf = trigPtr.p->operation->dataBuffer;
+    ndbrequire(sz <= buf.getMaxWrite());
+    if (unlikely(!buf.getWritePtr(&dst, sz)))
+    {
+      jam();
+      dst = 0;
+    }
+  }
+
+  if (unlikely(dst == 0))
+  {
+    Uint32 save[TrigAttrInfo::StaticLength];
+    memcpy(save, signal->getDataPtr(), 4*TrigAttrInfo::StaticLength);
+    BackupRecordPtr ptr LINT_SET_PTR;
+    c_backupPool.getPtr(ptr, trigPtr.p->backupPtr);
+    trigPtr.p->errorCode = AbortBackupOrd::LogBufferFull;
+    AbortBackupOrd *ord = (AbortBackupOrd*)signal->getDataPtrSend();
+    ord->backupId = ptr.p->backupId;
+    ord->backupPtr = ptr.i;
+    ord->requestType = AbortBackupOrd::LogBufferFull;
+    ord->senderData= ptr.i;
+    sendSignal(ptr.p->masterRef, GSN_ABORT_BACKUP_ORD, signal,
+               AbortBackupOrd::SignalLength, JBB);
+
+    memcpy(signal->getDataPtrSend(), save, 4*TrigAttrInfo::StaticLength);
+    return 0;
+  }//if
+
+  BackupFormat::LogFile::LogEntry * logEntry =
+    (BackupFormat::LogFile::LogEntry *)dst;
+  logEntry->Length       = 0;
+  logEntry->TableId      = htonl(trigPtr.p->tableId);
+
+  if(trigPtr.p->event==0)
+    logEntry->TriggerEvent= htonl(TriggerEvent::TE_INSERT);
+  else if(trigPtr.p->event==1)
+    logEntry->TriggerEvent= htonl(TriggerEvent::TE_UPDATE);
+  else if(trigPtr.p->event==2)
+    logEntry->TriggerEvent= htonl(TriggerEvent::TE_DELETE);
+  else {
+    ndbout << "Bad Event: " << trigPtr.p->event << endl;
+    ndbrequire(false);
+  }
+
+  return logEntry;
+}
+
 void
 Backup::execTRIG_ATTRINFO(Signal* signal) {
   jamEntry();
@@ -4661,46 +4721,12 @@ Backup::execTRIG_ATTRINFO(Signal* signal
   if(logEntry == 0) 
   {
     jam();
-    Uint32 * dst;
-    FsBuffer & buf = trigPtr.p->operation->dataBuffer;
-    ndbrequire(trigPtr.p->maxRecordSize <= buf.getMaxWrite());
-
-    if(ERROR_INSERTED(10030) ||
-       !buf.getWritePtr(&dst, trigPtr.p->maxRecordSize)) 
+    Uint32 sz = trigPtr.p->maxRecordSize;
+    logEntry = trigPtr.p->logEntry = get_log_buffer(signal, trigPtr, sz);
+    if (unlikely(logEntry == 0))
     {
       jam();
-      Uint32 save[TrigAttrInfo::StaticLength];
-      memcpy(save, signal->getDataPtr(), 4*TrigAttrInfo::StaticLength);
-      BackupRecordPtr ptr LINT_SET_PTR;
-      c_backupPool.getPtr(ptr, trigPtr.p->backupPtr);
-      trigPtr.p->errorCode = AbortBackupOrd::LogBufferFull;
-      AbortBackupOrd *ord = (AbortBackupOrd*)signal->getDataPtrSend();
-      ord->backupId = ptr.p->backupId;
-      ord->backupPtr = ptr.i;
-      ord->requestType = AbortBackupOrd::LogBufferFull;
-      ord->senderData= ptr.i;
-      sendSignal(ptr.p->masterRef, GSN_ABORT_BACKUP_ORD, signal, 
-		 AbortBackupOrd::SignalLength, JBB);
-
-      memcpy(signal->getDataPtrSend(), save, 4*TrigAttrInfo::StaticLength);
       return;
-    }//if
-
-    logEntry = (BackupFormat::LogFile::LogEntry *)dst;
-    trigPtr.p->logEntry = logEntry;
-    logEntry->Length       = 0;
-    logEntry->TableId      = htonl(trigPtr.p->tableId);
-
-
-    if(trigPtr.p->event==0)
-      logEntry->TriggerEvent= htonl(TriggerEvent::TE_INSERT);
-    else if(trigPtr.p->event==1)
-      logEntry->TriggerEvent= htonl(TriggerEvent::TE_UPDATE);
-    else if(trigPtr.p->event==2)
-      logEntry->TriggerEvent= htonl(TriggerEvent::TE_DELETE);
-    else {
-      ndbout << "Bad Event: " << trigPtr.p->event << endl;
-      ndbrequire(false);
     }
   } else {
     ndbrequire(logEntry->TableId == htonl(trigPtr.p->tableId));
@@ -4731,12 +4757,15 @@ Backup::execFIRE_TRIG_ORD(Signal* signal
 
   if(trigPtr.p->errorCode != 0) {
     jam();
+    SectionHandle handle(this, signal);
+    releaseSections(handle);
     return;
   }//if
 
-  if (isNdbMtLqh())
+  if (signal->getNoOfSections())
   {
     jam();
+    SectionHandle handle(this, signal);
     TablePtr tabPtr;
     c_tablePool.getPtr(tabPtr, trigPtr.p->tab_ptr_i);
     FragmentPtr fragPtr;
@@ -4745,8 +4774,26 @@ Backup::execFIRE_TRIG_ORD(Signal* signal
     {
       jam();
       trigPtr.p->logEntry = 0;      
+      releaseSections(handle);
+      return;
+    }
+
+    SegmentedSectionPtr ptr[3];
+    handle.getSection(ptr[0], 0);
+    handle.getSection(ptr[1], 1);
+    handle.getSection(ptr[2], 2);
+    trigPtr.p->logEntry = get_log_buffer(signal,
+                                         trigPtr, ptr[0].sz + ptr[2].sz);
+    if (unlikely(trigPtr.p->logEntry == 0))
+    {
+      jam();
+      releaseSections(handle);
       return;
     }
+    copy(trigPtr.p->logEntry->Data, ptr[0]);
+    copy(trigPtr.p->logEntry->Data+ptr[0].sz, ptr[2]);
+    trigPtr.p->logEntry->Length = ptr[0].sz + ptr[2].sz;
+    releaseSections(handle);
   }
 
   ndbrequire(trigPtr.p->logEntry != 0);

=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.hpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.hpp	2008-10-05 07:14:08 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.hpp	2008-10-09 19:17:11 +0000
@@ -662,6 +662,8 @@ public:
   void afterGetTabinfoLockTab(Signal *signal,
                               BackupRecordPtr ptr, TablePtr tabPtr);
   void cleanupNextTable(Signal *signal, BackupRecordPtr ptr, TablePtr tabPtr);
+
+  BackupFormat::LogFile::LogEntry* get_log_buffer(Signal*,TriggerPtr, Uint32);
 };
 
 inline

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-10-09 01:13:03 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2008-10-10 11:39:51 +0000
@@ -3199,11 +3199,22 @@ void Dbdict::execSCHEMA_INFO(Signal* sig
     CRASH_INSERTION(6001);
   }
 
+  {
+    /**
+     * Copy "own" into new
+     */
+    XSchemaFile * oldxsf = &c_schemaFile[SchemaRecord::OLD_SCHEMA_FILE];
+    XSchemaFile * newxsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
+    memcpy(&newxsf->schemaPage[0],
+           &oldxsf->schemaPage[0],
+           oldxsf->schemaPage[0].FileSize);
+  }
+
   SectionHandle handle(this, signal);
   SegmentedSectionPtr schemaDataPtr;
   handle.getSection(schemaDataPtr, 0);
 
-  XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
+  XSchemaFile * xsf = &c_schemaFile[SchemaRecord::OLD_SCHEMA_FILE];
   ndbrequire(schemaDataPtr.sz % NDB_SF_PAGE_SIZE_IN_WORDS == 0);
   xsf->noOfPages = schemaDataPtr.sz / NDB_SF_PAGE_SIZE_IN_WORDS;
   copy((Uint32*)&xsf->schemaPage[0], schemaDataPtr);
@@ -3217,9 +3228,9 @@ void Dbdict::execSCHEMA_INFO(Signal* sig
     
   validateChecksum(xsf);
 
-  XSchemaFile * oldxsf = &c_schemaFile[SchemaRecord::OLD_SCHEMA_FILE];
-  checkPendingSchemaTrans(oldxsf);
-  resizeSchemaFile(xsf, oldxsf->noOfPages);
+  XSchemaFile * ownxsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
+  checkPendingSchemaTrans(ownxsf);
+  resizeSchemaFile(xsf, ownxsf->noOfPages);
 
   ndbrequire(signal->getSendersBlockRef() != reference());
     
@@ -3376,10 +3387,13 @@ operator<<(NdbOut& out, const SchemaFile
 
 void Dbdict::checkSchemaStatus(Signal* signal) 
 {
-  XSchemaFile * newxsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
-  XSchemaFile * oldxsf = &c_schemaFile[SchemaRecord::OLD_SCHEMA_FILE];
-  ndbrequire(newxsf->noOfPages == oldxsf->noOfPages);
-  const Uint32 noOfEntries = newxsf->noOfPages * NDB_SF_PAGE_ENTRIES;
+  // masterxsf == schema file of master (i.e what's currently in cluster)
+  // ownxsf = schema file read from disk
+  XSchemaFile * masterxsf = &c_schemaFile[SchemaRecord::OLD_SCHEMA_FILE];
+  XSchemaFile * ownxsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
+
+  ndbrequire(masterxsf->noOfPages == ownxsf->noOfPages);
+  const Uint32 noOfEntries = masterxsf->noOfPages * NDB_SF_PAGE_ENTRIES;
 
   for (; c_restartRecord.activeTable < noOfEntries;
        c_restartRecord.activeTable++)
@@ -3387,84 +3401,84 @@ void Dbdict::checkSchemaStatus(Signal* s
     jam();
 
     Uint32 tableId = c_restartRecord.activeTable;
-    SchemaFile::TableEntry *newEntry = getTableEntry(newxsf, tableId);
-    SchemaFile::TableEntry *oldEntry = getTableEntry(oldxsf, tableId);
-    SchemaFile::EntryState newState =
-      (SchemaFile::EntryState)newEntry->m_tableState;
-    SchemaFile::EntryState oldState =
-      (SchemaFile::EntryState)oldEntry->m_tableState;
+    SchemaFile::TableEntry *masterEntry = getTableEntry(masterxsf, tableId);
+    SchemaFile::TableEntry *ownEntry = getTableEntry(ownxsf, tableId);
+    SchemaFile::EntryState masterState =
+      (SchemaFile::EntryState)masterEntry->m_tableState;
+    SchemaFile::EntryState ownState =
+      (SchemaFile::EntryState)ownEntry->m_tableState;
 
     if (c_restartRecord.activeTable >= c_tableRecordPool.getSize())
     {
       jam();
-      ndbrequire(newState == SchemaFile::SF_UNUSED);
-      ndbrequire(oldState == SchemaFile::SF_UNUSED);
+      ndbrequire(masterState == SchemaFile::SF_UNUSED);
+      ndbrequire(ownState == SchemaFile::SF_UNUSED);
       continue;
     }//if
 
-    D("checkSchemaStatus" << V(*oldEntry) << V(*newEntry));
+    D("checkSchemaStatus" << V(*ownEntry) << V(*masterEntry));
 
 //#define PRINT_SCHEMA_RESTART
 #ifdef PRINT_SCHEMA_RESTART
     printf("checkSchemaStatus: pass: %d table: %d",
            c_restartRecord.m_pass, tableId);
-    ndbout << "old: " << *oldEntry << " new: " << *newEntry;
+    ndbout << "old: " << *ownEntry << " new: " << *masterEntry;
 #endif
 
     if (c_restartRecord.m_pass <= CREATE_OLD_PASS)
     {
-      if (!::checkSchemaStatus(oldEntry->m_tableType, c_restartRecord.m_pass))
+      if (!::checkSchemaStatus(ownEntry->m_tableType, c_restartRecord.m_pass))
         continue;
 
 
-      if (oldState == SchemaFile::SF_UNUSED)
+      if (ownState == SchemaFile::SF_UNUSED)
         continue;
 
-      restartCreateObj(signal, tableId, oldEntry, true);
+      restartCreateObj(signal, tableId, ownEntry, true);
       return;
     }
 
     if (c_restartRecord.m_pass <= DROP_OLD_PASS)
     {
-      if (!::checkSchemaStatus(oldEntry->m_tableType, c_restartRecord.m_pass))
+      if (!::checkSchemaStatus(ownEntry->m_tableType, c_restartRecord.m_pass))
         continue;
 
-      if (oldState != SchemaFile::SF_IN_USE)
+      if (ownState != SchemaFile::SF_IN_USE)
         continue;
 
-      if (* oldEntry == * newEntry)
+      if (* ownEntry == * masterEntry)
         continue;
 
-      restartDropObj(signal, tableId, oldEntry);
+      restartDropObj(signal, tableId, ownEntry);
       return;
     }
 
     if (c_restartRecord.m_pass <= CREATE_NEW_PASS)
     {
-      if (!::checkSchemaStatus(newEntry->m_tableType, c_restartRecord.m_pass))
+      if (!::checkSchemaStatus(masterEntry->m_tableType, c_restartRecord.m_pass))
         continue;
 
-      if (newState != SchemaFile::SF_IN_USE)
+      if (masterState != SchemaFile::SF_IN_USE)
         continue;
 
       /**
        * handle table(index) special as DIH has already copied
        *   table (using COPY_TABREQ)
        */
-      if (DictTabInfo::isIndex(newEntry->m_tableType) ||
-          DictTabInfo::isTable(newEntry->m_tableType))
+      if (DictTabInfo::isIndex(masterEntry->m_tableType) ||
+          DictTabInfo::isTable(masterEntry->m_tableType))
       {
-        bool file = * oldEntry == *newEntry &&
-          (!DictTabInfo::isIndex(newEntry->m_tableType) || c_systemRestart);
+        bool file = * ownEntry == *masterEntry &&
+          (!DictTabInfo::isIndex(masterEntry->m_tableType) || c_systemRestart);
 
-        restartCreateObj(signal, tableId, newEntry, file);
+        restartCreateObj(signal, tableId, masterEntry, file);
         return;
       }
 
-      if (* oldEntry == *newEntry)
+      if (* ownEntry == *masterEntry)
         continue;
 
-      restartCreateObj(signal, tableId, newEntry, false);
+      restartCreateObj(signal, tableId, masterEntry, false);
       return;
     }
   }
@@ -3793,6 +3807,7 @@ Dbdict::restartCreateObj(Signal* signal,
     c_readTableRecord.m_callback.m_callbackFunction = 
       safe_cast(&Dbdict::restartCreateObj_readConf);
     
+    ndbout_c("restartCreateObj(%u) file: %u", tableId, file);
     startReadTableFile(signal, tableId);
   }
   else
@@ -3949,12 +3964,15 @@ Dbdict::restartDropObj(Signal* signal, 
   case DictTabInfo::OrderedIndex:
     Ptr<DropTableRec> opRecPtr;
     seizeSchemaOp(op_ptr, opRecPtr);
+    ndbrequire(false);
     break;
   case DictTabInfo::Undofile:
   case DictTabInfo::Datafile:
   {
     Ptr<DropFileRec> opRecPtr;
     seizeSchemaOp(op_ptr, opRecPtr);
+    opRecPtr.p->m_request.file_id = tableId;
+    opRecPtr.p->m_request.file_version = entry->m_tableVersion;
     break;
   }
   case DictTabInfo::Tablespace:
@@ -3962,9 +3980,13 @@ Dbdict::restartDropObj(Signal* signal, 
   {
     Ptr<DropFilegroupRec> opRecPtr;
     seizeSchemaOp(op_ptr, opRecPtr);
+    opRecPtr.p->m_request.filegroup_id = tableId;
+    opRecPtr.p->m_request.filegroup_version = entry->m_tableVersion;
     break;
   }
   }
+
+  ndbout_c("restartDropObj(%u)", tableId);
   
   Ptr<TxHandle> tx_ptr;
   c_txHandleHash.getPtr(tx_ptr, c_restartRecord.m_tx_ptr_i);
@@ -17672,6 +17694,12 @@ Dbdict::createFile_parse(Signal* signal,
   }
 
   createFilePtr.p->m_parsed = true;
+
+#if defined VM_TRACE || defined ERROR_INSERT
+  ndbout_c("Dbdict: create name=%s,id=%u,obj_ptr_i=%d",
+           f.FileName, impl_req->file_id, filePtr.p->m_obj_ptr_i);
+#endif
+
   return;
 error:
   if (!filePtr.isNull())
@@ -18297,6 +18325,12 @@ Dbdict::createFilegroup_parse(Signal* si
     increase_ref_count(inc_obj_ptr_i);
   }
   createFilegroupPtr.p->m_parsed = true;
+
+#if defined VM_TRACE || defined ERROR_INSERT
+  ndbout_c("Dbdict: create name=%s,id=%u,obj_ptr_i=%d",
+           fg.FilegroupName, impl_req->filegroup_id, fg_ptr.p->m_obj_ptr_i);
+#endif
+
   return;
 
 error:
@@ -18702,6 +18736,17 @@ Dbdict::dropFile_parse(Signal* signal, b
     setError(error, err, __LINE__);
     return;
   }
+
+#if defined VM_TRACE || defined ERROR_INSERT
+  {
+    char buf[1024];
+    Rope name(c_rope_pool, f_ptr.p->m_path);
+    name.copy(buf);
+    ndbout_c("Dbdict: drop name=%s,id=%u,obj_id=%u", buf, 
+             impl_req->file_id,
+             f_ptr.p->m_obj_ptr_i);
+  }
+#endif
 }
 
 void
@@ -19049,6 +19094,17 @@ Dbdict::dropFilegroup_parse(Signal* sign
     setError(error, err, __LINE__);
     return;
   }
+
+#if defined VM_TRACE || defined ERROR_INSERT
+  {
+    char buf[1024];
+    Rope name(c_rope_pool, fg_ptr.p->m_name);
+    name.copy(buf);
+    ndbout_c("Dbdict: drop name=%s,id=%u,obj_id=%u", buf, 
+             impl_req->filegroup_id,
+             fg_ptr.p->m_obj_ptr_i);
+  }
+#endif
 }
 
 void

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2008-10-01 13:27:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2008-10-09 10:11:38 +0000
@@ -260,6 +260,7 @@ public:
     Uint32 nextReplicaNode;
     Uint32 nodeCount;
     Uint32 activeTakeOver; // Which node...
+    Uint32 m_next_log_part;
     Uint32 nodegroupIndex;
     Uint32 m_ref_count;
   };
@@ -1245,7 +1246,6 @@ private:
 
   Uint32 c_nextNodeGroup;
   NodeGroupRecord *nodeGroupRecord;
-  Uint32 c_nextLogPart;
   
   NodeRecord *nodeRecord;
 

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2008-10-08 14:07:13 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2008-10-09 19:17:11 +0000
@@ -7021,7 +7021,7 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ
         }
         const Uint32 max = NGPtr.p->nodeCount;
 	
-	fragments[count++] = c_nextLogPart++; // Store logpart first
+	fragments[count++] = NGPtr.p->m_next_log_part++; // Store logpart first
 	Uint32 tmp= next_replica_node[NGPtr.i];
         for(Uint32 replicaNo = 0; replicaNo < noOfReplicas; replicaNo++)
         {
@@ -7118,12 +7118,13 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ
         for (Uint32 i = primTabPtr.p->totalfragments; i<noOfFragments; i++)
         {
           jam();
-          Uint32 node = find_min_index(fragments_per_node, sizeof(fragments_per_node)/sizeof(fragments_per_node[0]));
-          fragments[count++] = c_nextLogPart++;
-          fragments[count++] = node;
-          fragments_per_node[node]++;
+          Uint32 node = find_min_index(fragments_per_node, 
+                                       NDB_ARRAY_SIZE(fragments_per_node));
           NGPtr.i = getNodeGroup(node);
           ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
+          fragments[count++] = NGPtr.p->m_next_log_part++;
+          fragments[count++] = node;
+          fragments_per_node[node]++;
           for (Uint32 r = 0; r<noOfReplicas; r++)
           {
             jam();
@@ -9310,12 +9311,13 @@ Dbdih::execSUB_GCP_COMPLETE_REP(Signal* 
 
   ndbrequire(m_micro_gcp.m_state == MicroGcp::M_GCP_COMMITTED);
   m_micro_gcp.m_state = MicroGcp::M_GCP_IDLE;
+
   /**
-   * To get correct signal order and avoid races, this signal to SUMA is sent
-   * on the same prio as the GCP_PREPARE signal sent to SUMA in
-   * execGPC_PREPARE
+   * To handle multiple LQH instances, this need to be passed though
+   * each LQH...(so that no fire-trig-ord can arrive "too" late)
    */
-  sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_REP, signal, signal->length(), JBB);
+  sendSignal(DBLQH_REF, GSN_SUB_GCP_COMPLETE_REP, signal,
+             signal->length(), JBB);
 }
 
 /*****************************************************************************/
@@ -13554,7 +13556,6 @@ void Dbdih::initCommonData()
   c_newest_restorable_gci = 0;
   cverifyQueueCounter = 0;
   cwaitLcpSr = false;
-  c_nextLogPart = 0;
   c_nodeStartMaster.blockGcp = false;
 
   nodeResetStart(0);
@@ -13905,6 +13906,7 @@ void Dbdih::initialiseRecordsLab(Signal*
         loopNGPtr.p->activeTakeOver = false;
         loopNGPtr.p->nodegroupIndex = RNIL;
         loopNGPtr.p->m_ref_count = 0;
+        loopNGPtr.p->m_next_log_part = 0;
       }//for
       break;
     }
@@ -15591,7 +15593,7 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
 	Uint32 nodeOrder[MAX_REPLICAS];
 	const Uint32 noOfReplicas = extractNodeInfo(fragPtr.p, nodeOrder);
 	char buf[100];
-	BaseString::snprintf(buf, sizeof(buf), " Table %d Fragment %d - ", tabPtr.i, j);
+	BaseString::snprintf(buf, sizeof(buf), " Table %d Fragment %d(%u) - ", tabPtr.i, j, dihGetInstanceKey(fragPtr));
 	for(Uint32 k = 0; k < noOfReplicas; k++){
 	  char tmp[100];
 	  BaseString::snprintf(tmp, sizeof(tmp), "%d ", nodeOrder[k]);

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-09-19 08:52:44 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2008-10-09 19:17:11 +0000
@@ -2146,6 +2146,7 @@ private:
   void execSTART_RECREF(Signal* signal);
 
   void execGCP_SAVEREQ(Signal* signal);
+  void execSUB_GCP_COMPLETE_REP(Signal* signal);
   void execFSOPENREF(Signal* signal);
   void execFSOPENCONF(Signal* signal);
   void execFSCLOSECONF(Signal* signal);

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2008-09-04 15:45:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2008-10-09 19:17:11 +0000
@@ -344,6 +344,8 @@ Dblqh::Dblqh(Block_context& ctx, Uint32 
   addRecSignal(GSN_DROP_FRAG_REF, &Dblqh::execDROP_FRAG_REF);
   addRecSignal(GSN_DROP_FRAG_CONF, &Dblqh::execDROP_FRAG_CONF);
 
+  addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &Dblqh::execSUB_GCP_COMPLETE_REP);
+
   initData();
 
 #ifdef VM_TRACE

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-10-08 14:05:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-10-09 19:17:11 +0000
@@ -12765,6 +12765,18 @@ void Dblqh::execGCP_SAVEREQ(Signal* sign
   return;
 }//Dblqh::execGCP_SAVEREQ()
 
+/**
+ * This is needed for ndbmtd to serialize
+ * SUB_GCP_COMPLETE_REP vs FIRE_TRIG_ORD
+ */
+void
+Dblqh::execSUB_GCP_COMPLETE_REP(Signal* signal)
+{
+  jamEntry();
+  sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_REP, signal,
+             signal->getLength(), JBB);
+}
+
 /* ------------------------------------------------------------------------- */
 /*  START TIME SUPERVISION OF THE LOG PARTS.                                 */
 /* ------------------------------------------------------------------------- */

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2008-10-03 13:37:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2008-10-09 19:17:11 +0000
@@ -78,6 +78,9 @@ DblqhProxy::DblqhProxy(Block_context& ct
   // GSN_EMPTY_LCP_REQ
   addRecSignal(GSN_EMPTY_LCP_REQ, &DblqhProxy::execEMPTY_LCP_REQ);
   addRecSignal(GSN_EMPTY_LCP_CONF, &DblqhProxy::execEMPTY_LCP_CONF);
+
+  // GSN_SUB_GCP_COMPLETE_REP
+  addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &DblqhProxy::execSUB_GCP_COMPLETE_REP);
 }
 
 DblqhProxy::~DblqhProxy()
@@ -516,6 +519,19 @@ DblqhProxy::sendGCP_SAVECONF(Signal* sig
   ssRelease<Ss_GCP_SAVEREQ>(ssId);
 }
 
+// GSN_SUB_GCP_COMPLETE_REP
+void
+DblqhProxy::execSUB_GCP_COMPLETE_REP(Signal* signal)
+{
+  jamEntry();
+  for (Uint32 i = 0; i<c_workers; i++)
+  {
+    jam();
+    sendSignal(workerRef(i), GSN_SUB_GCP_COMPLETE_REP, signal,
+               signal->getLength(), JBB);
+  }
+}
+
 // GSN_PREP_DROP_TAB_REQ
 
 void

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp	2008-10-03 13:37:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp	2008-10-09 19:17:11 +0000
@@ -159,6 +159,9 @@ protected:
   void execGCP_SAVEREF(Signal*);
   void sendGCP_SAVECONF(Signal*, Uint32 ssId);
 
+  // GSN_SUB_GCP_COMPLETE_REP
+  void execSUB_GCP_COMPLETE_REP(Signal*);
+
   // GSN_PREP_DROP_TAB_REQ
   struct Ss_PREP_DROP_TAB_REQ : SsParallel {
     PrepDropTabReq m_req;

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2008-10-06 06:08:30 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2008-10-10 09:32:12 +0000
@@ -2336,12 +2336,25 @@ private:
 				 KeyReqStruct *req_struct,
 				 AttributeHeader* ahOut,
 				 Uint32  attrDes2);
+
+  bool readDiskVarAsFixedSizeNotNULL(Uint8* outBuffer,
+				KeyReqStruct *req_struct,
+				AttributeHeader* ahOut,
+				Uint32  attrDes2);
+  
+  bool readDiskVarAsFixedSizeNULLable(Uint8* outBuffer,
+				 KeyReqStruct *req_struct,
+				 AttributeHeader* ahOut,
+				 Uint32  attrDes2);
   bool readDiskVarSizeNULLable(Uint8*, KeyReqStruct*, AttributeHeader*,Uint32);
   bool readDiskVarSizeNotNULL(Uint8*, KeyReqStruct*, AttributeHeader*, Uint32);
 
   bool updateDiskFixedSizeNULLable(Uint32*, KeyReqStruct*, Uint32);
   bool updateDiskFixedSizeNotNULL(Uint32*, KeyReqStruct*, Uint32);
 
+  bool updateDiskVarAsFixedSizeNULLable(Uint32*, KeyReqStruct*, Uint32);
+  bool updateDiskVarAsFixedSizeNotNULL(Uint32*, KeyReqStruct*, Uint32);
+
   bool updateDiskVarSizeNULLable(Uint32*, KeyReqStruct *, Uint32);
   bool updateDiskVarSizeNotNULL(Uint32*, KeyReqStruct *, Uint32);
   

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2008-09-30 15:10:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2008-10-10 09:32:12 +0000
@@ -189,7 +189,8 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* 
       fragOperPtr.p->m_null_bits[ind]++;
     } 
 
-    if (AttributeDescriptor::getArrayType(attrDescriptor)==NDB_ARRAYTYPE_FIXED)
+    if (AttributeDescriptor::getArrayType(attrDescriptor)==NDB_ARRAYTYPE_FIXED
+        || ind==DD)
     {
       jam();
       regTabPtr.p->m_attributes[ind].m_no_of_fixsize++;
@@ -1162,7 +1163,7 @@ Dbtup::computeTableMetaData(Tablerec *re
       regTabPtr->notNullAttributeMask.set(i);
     if (!AttributeDescriptor::getDynamic(attrDescriptor))
     {
-      if(arr == NDB_ARRAYTYPE_FIXED)
+      if(arr == NDB_ARRAYTYPE_FIXED || ind==DD)
       {
         if (attrLen!=0)
         {
@@ -1442,7 +1443,8 @@ void Dbtup::setUpKeyArray(Tablerec* cons
         continue;
       }
 
-      if (AttributeDescriptor::getArrayType(desc) != NDB_ARRAYTYPE_FIXED ||
+      if ((AttributeDescriptor::getArrayType(desc) != NDB_ARRAYTYPE_FIXED 
+           && !AttributeDescriptor::getDiskBased(desc)) ||
           (AttributeDescriptor::getDynamic(desc) &&
            AttributeDescriptor::getArrayType(desc) == NDB_ARRAYTYPE_FIXED &&
            AttributeDescriptor::getSizeInWords(desc) > InternalMaxDynFix))

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2008-10-08 14:11:47 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2008-10-10 09:32:12 +0000
@@ -182,17 +182,25 @@ Dbtup::setUpQueryRoutines(Tablerec *regT
 	  r[1] = &Dbtup::readDiskBitsNULLable;
 	  r[2] = &Dbtup::readDiskFixedSizeNotNULL;
 	  r[3] = &Dbtup::readDiskFixedSizeNULLable;
+	  r[4] = &Dbtup::readDiskVarAsFixedSizeNotNULL;
+	  r[5] = &Dbtup::readDiskVarAsFixedSizeNULLable;
+          /*
 	  r[4] = &Dbtup::readDiskVarSizeNULLable;
 	  r[5] = &Dbtup::readDiskVarSizeNotNULL;
+          */
         }
-	UpdateFunction u[6];
+       UpdateFunction u[6];
         {
 	  u[0] = &Dbtup::updateDiskBitsNotNULL;
 	  u[1] = &Dbtup::updateDiskBitsNULLable;
 	  u[2] = &Dbtup::updateDiskFixedSizeNotNULL;
 	  u[3] = &Dbtup::updateDiskFixedSizeNULLable;
+	  u[4] = &Dbtup::updateDiskVarAsFixedSizeNotNULL;
+	  u[5] = &Dbtup::updateDiskVarAsFixedSizeNULLable;
+          /*
 	  u[4] = &Dbtup::updateDiskVarSizeNULLable;
 	  u[5] = &Dbtup::updateDiskVarSizeNotNULL;
+          */
         }
 	Uint32 a= 
 	  AttributeDescriptor::getArrayType(attrDescr) == NDB_ARRAYTYPE_FIXED 
@@ -1393,6 +1401,79 @@ Dbtup::readDiskFixedSizeNULLable(Uint8* 
 }
 
 bool
+Dbtup::readDiskVarAsFixedSizeNotNULL(Uint8* outBuffer,
+				KeyReqStruct *req_struct,
+				AttributeHeader* ahOut,
+				Uint32  attrDes2)
+{
+  ndbassert(req_struct->out_buf_bits == 0);
+
+  Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 *tuple_header= req_struct->m_disk_ptr->m_data;
+  Uint32 indexBuf= req_struct->out_buf_index;
+  Uint32 readOffset= AttributeOffset::getOffset(attrDes2);
+
+  Uint32 maxRead= req_struct->max_read;
+  Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
+  Uint8* dst = (outBuffer + indexBuf);
+  const Uint8* src = (Uint8*)(tuple_header+readOffset);
+
+  Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+  Uint32 attrNoOfWords= (srcBytes + 3) >> 2;
+  Uint32 newIndexBuf = indexBuf + srcBytes;
+  Uint32 typeId = AttributeDescriptor::getType(attrDescriptor);
+  Uint32 lb, len;
+
+  if (typeId != NDB_ARRAYTYPE_FIXED &&
+      NdbSqlUtil::get_var_length(typeId, src, srcBytes, lb, len)) {   
+    srcBytes = len + lb;
+    newIndexBuf = indexBuf + srcBytes;
+    attrNoOfWords= (srcBytes + 3) >> 2;
+   }
+
+  ndbrequire((readOffset + attrNoOfWords - 1) < req_struct->check_offset[DD]);
+  if (! charsetFlag || ! req_struct->xfrm_flag) 
+  {
+    if (newIndexBuf <= maxRead) 
+    {
+      jam(); 
+      ahOut->setByteSize(srcBytes);
+      memcpy(dst, src, srcBytes);
+      zero32(dst, srcBytes);
+      req_struct->out_buf_index = newIndexBuf;
+      return true;
+    }
+  } 
+  else 
+  {
+    return xfrm_reader(dst, req_struct, ahOut, attrDes2, src, srcBytes);
+  } 
+  
+  jam();
+  terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+  return false;
+}
+
+bool
+Dbtup::readDiskVarAsFixedSizeNULLable(Uint8* outBuffer,
+				 KeyReqStruct *req_struct,
+				 AttributeHeader* ahOut,
+				 Uint32  attrDes2)
+{
+  if (!disk_nullFlagCheck(req_struct, attrDes2)) {
+    jam();
+    return readDiskVarAsFixedSizeNotNULL(outBuffer,
+				    req_struct,
+				    ahOut,
+				    attrDes2);
+  } else {
+    jam();
+    ahOut->setNULL();
+    return true;
+  }
+}
+
+bool
 Dbtup::readDiskVarSizeNotNULL(Uint8* out_buffer,
 			      KeyReqStruct *req_struct,
 			      AttributeHeader* ah_out,
@@ -2662,6 +2743,104 @@ Dbtup::updateDiskFixedSizeNULLable(Uint3
 }
 
 bool
+Dbtup::updateDiskVarAsFixedSizeNotNULL(Uint32* inBuffer,
+			      KeyReqStruct* req_struct,
+			      Uint32  attrDes2)
+{
+  Uint32 attrDescriptor= req_struct->attr_descriptor;
+  Uint32 indexBuf= req_struct->in_buf_index;
+  Uint32 inBufLen= req_struct->in_buf_len;
+  Uint32 updateOffset= AttributeOffset::getOffset(attrDes2);
+  Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
+  
+  AttributeHeader ahIn(inBuffer[indexBuf]);
+  Uint32 noOfWords= AttributeDescriptor::getSizeInWords(attrDescriptor);
+  Uint32 nullIndicator= ahIn.isNULL();
+  Uint32 size_in_words=  ahIn.getDataSize();
+
+  Uint32 newIndex= indexBuf + size_in_words + 1;
+  Uint32 *tuple_header= req_struct->m_disk_ptr->m_data;
+  ndbrequire((updateOffset + noOfWords - 1) < req_struct->check_offset[DD]);
+
+  if (size_in_words <= noOfWords) {
+    if (!nullIndicator) {
+      jam();
+      if (charsetFlag) {
+        jam();
+        Tablerec* regTabPtr = tabptr.p;
+        Uint32 typeId= AttributeDescriptor::getType(attrDescriptor);
+        Uint32 bytes= AttributeDescriptor::getSizeInBytes(attrDescriptor);
+        Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
+        ndbrequire(i < regTabPtr->noOfCharsets);
+        // not const in MySQL
+        CHARSET_INFO* cs = regTabPtr->charsetArray[i];
+	int not_used;
+        const char* ssrc = (const char*)&inBuffer[indexBuf + 1];
+        Uint32 lb, len;
+        if (! NdbSqlUtil::get_var_length(typeId, ssrc, bytes, lb, len)) {
+          jam();
+          terrorCode = ZINVALID_CHAR_FORMAT;
+          return false;
+        }
+	// fast fix bug#7340
+        if (typeId != NDB_TYPE_TEXT &&
+	    (*cs->cset->well_formed_len)(cs, ssrc + lb, ssrc + lb + len, ZNIL, &not_used) != len) {
+          jam();
+          terrorCode = ZINVALID_CHAR_FORMAT;
+          return false;
+        }
+      }
+
+      req_struct->in_buf_index= newIndex;
+      MEMCOPY_NO_WORDS(&tuple_header[updateOffset],
+                       &inBuffer[indexBuf + 1],
+                       size_in_words);
+      return true;
+    } else {
+      jam();
+      terrorCode= ZNOT_NULL_ATTR;
+      return false;
+    }
+  } else {
+    jam();
+    terrorCode= ZAI_INCONSISTENCY_ERROR;
+    return false;
+  }
+}
+
+bool
+Dbtup::updateDiskVarAsFixedSizeNULLable(Uint32* inBuffer,
+				   KeyReqStruct *req_struct,
+				   Uint32  attrDes2)
+{
+  Tablerec* const regTabPtr=  tabptr.p;
+  AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
+  Uint32 nullIndicator= ahIn.isNULL();
+  Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+  Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD);
+  
+  if (!nullIndicator) {
+    jam();
+    BitmaskImpl::clear(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+    return updateDiskVarAsFixedSizeNotNULL(inBuffer,
+				      req_struct,
+				      attrDes2);
+  } else {
+    Uint32 newIndex= req_struct->in_buf_index + 1;
+    if (newIndex <= req_struct->in_buf_len) {
+      BitmaskImpl::set(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+      jam();
+      req_struct->in_buf_index= newIndex;
+      return true;
+    } else {
+      jam();
+      terrorCode= ZAI_INCONSISTENCY_ERROR;
+      return false;
+    }
+  }
+}
+
+bool
 Dbtup::updateDiskVarSizeNotNULL(Uint32* in_buffer,
                             KeyReqStruct *req_struct,
                             Uint32 attr_des2)

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2008-10-02 17:51:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2008-10-09 19:17:11 +0000
@@ -947,6 +947,7 @@ out:
 // We start by setting common header info for all TRIG_ATTRINFO signals.
 //--------------------------------------------------------------------
   bool executeDirect;
+  bool longsignal = false;
   TrigAttrInfo* const trigAttrInfo = (TrigAttrInfo *)signal->getDataPtrSend();
   trigAttrInfo->setConnectionPtr(req_struct->TC_index);
   trigAttrInfo->setTriggerId(trigPtr->triggerId);
@@ -965,6 +966,10 @@ out:
     ref = trigPtr->m_receiverRef;
     // executeDirect = !isNdbMtLqh() || (refToMain(ref) != SUMA);
     executeDirect = refToInstance(ref) == instance();
+
+    // If we can do execute direct, lets do that, else do long signal (only local node)
+    longsignal = !executeDirect;
+    ndbassert(refToNode(ref) == 0 || refToNode(ref) == getOwnNodeId());
     break;
   case (TriggerType::READ_ONLY_CONSTRAINT):
     terrorCode = ZREAD_ONLY_CONSTRAINT_VIOLATION;
@@ -977,44 +982,147 @@ out:
 
   req_struct->no_fired_triggers++;
 
-  trigAttrInfo->setAttrInfoType(TrigAttrInfo::PRIMARY_KEY);
-  sendTrigAttrInfo(signal, keyBuffer, noPrimKey, executeDirect, ref);
+  if (longsignal == false)
+  {
+    jam();
+
+    trigAttrInfo->setAttrInfoType(TrigAttrInfo::PRIMARY_KEY);
+    sendTrigAttrInfo(signal, keyBuffer, noPrimKey, executeDirect, ref);
+
+    switch(regOperPtr->op_struct.op_type) {
+    case(ZINSERT):
+      jam();
+      // Send AttrInfo signals with new attribute values
+      trigAttrInfo->setAttrInfoType(TrigAttrInfo::AFTER_VALUES);
+      sendTrigAttrInfo(signal, afterBuffer, noAfterWords, executeDirect, ref);
+      break;
+    case(ZDELETE):
+      if (trigPtr->sendBeforeValues) {
+        jam();
+        trigAttrInfo->setAttrInfoType(TrigAttrInfo::BEFORE_VALUES);
+        sendTrigAttrInfo(signal, beforeBuffer, noBeforeWords, executeDirect,ref);
+      }
+      break;
+    case(ZUPDATE):
+      jam();
+      if (trigPtr->sendBeforeValues) {
+        jam();
+        trigAttrInfo->setAttrInfoType(TrigAttrInfo::BEFORE_VALUES);
+        sendTrigAttrInfo(signal, beforeBuffer, noBeforeWords, executeDirect,ref);
+      }
+      trigAttrInfo->setAttrInfoType(TrigAttrInfo::AFTER_VALUES);
+      sendTrigAttrInfo(signal, afterBuffer, noAfterWords, executeDirect, ref);
+      break;
+    default:
+      ndbrequire(false);
+    }
+  }
+
+  /**
+   * sendFireTrigOrd
+   */
+  FireTrigOrd* const fireTrigOrd = (FireTrigOrd *)signal->getDataPtrSend();
+
+  fireTrigOrd->setConnectionPtr(req_struct->TC_index);
+  fireTrigOrd->setTriggerId(trigPtr->triggerId);
+  fireTrigOrd->fragId= regFragPtr.p->fragmentId;
 
   switch(regOperPtr->op_struct.op_type) {
   case(ZINSERT):
     jam();
-    // Send AttrInfo signals with new attribute values
-    trigAttrInfo->setAttrInfoType(TrigAttrInfo::AFTER_VALUES);
-    sendTrigAttrInfo(signal, afterBuffer, noAfterWords, executeDirect, ref);
+    fireTrigOrd->m_triggerEvent = TriggerEvent::TE_INSERT;
     break;
   case(ZDELETE):
-    if (trigPtr->sendBeforeValues) {
+    jam();
+    fireTrigOrd->m_triggerEvent = TriggerEvent::TE_DELETE;
+    break;
+  case(ZUPDATE):
+    jam();
+    fireTrigOrd->m_triggerEvent = TriggerEvent::TE_UPDATE;
+    break;
+  default:
+    ndbrequire(false);
+    break;
+  }
+
+  fireTrigOrd->setNoOfPrimaryKeyWords(noPrimKey);
+  fireTrigOrd->setNoOfBeforeValueWords(noBeforeWords);
+  fireTrigOrd->setNoOfAfterValueWords(noAfterWords);
+
+  switch(trigPtr->triggerType) {
+  case (TriggerType::SECONDARY_INDEX):
+  case (TriggerType::REORG_TRIGGER):
+    jam();
+    fireTrigOrd->m_triggerType = trigPtr->triggerType;
+    fireTrigOrd->m_transId1 = req_struct->trans_id1;
+    fireTrigOrd->m_transId2 = req_struct->trans_id2;
+    sendSignal(req_struct->TC_ref, GSN_FIRE_TRIG_ORD,
+               signal, FireTrigOrd::SignalLength, JBB);
+    break;
+  case (TriggerType::SUBSCRIPTION_BEFORE): // Only Suma
+    jam();
+    fireTrigOrd->setGCI(req_struct->gci_hi);
+    fireTrigOrd->setHashValue(req_struct->hash_value);
+    fireTrigOrd->m_any_value = regOperPtr->m_any_value;
+    fireTrigOrd->m_gci_lo = req_struct->gci_lo;
+    if (executeDirect)
+    {
       jam();
-      trigAttrInfo->setAttrInfoType(TrigAttrInfo::BEFORE_VALUES);
-      sendTrigAttrInfo(signal, beforeBuffer, noBeforeWords, executeDirect,ref);
+      EXECUTE_DIRECT(refToMain(trigPtr->m_receiverRef),
+                     GSN_FIRE_TRIG_ORD,
+                     signal,
+                     FireTrigOrd::SignalLengthSuma);
+      jamEntry();
+    }
+    else
+    {
+      ndbassert(longsignal);
+      LinearSectionPtr ptr[3];
+      ptr[0].p = keyBuffer;
+      ptr[0].sz = noPrimKey;
+      ptr[1].p = beforeBuffer;
+      ptr[1].sz = noBeforeWords;
+      ptr[2].p = afterBuffer;
+      ptr[2].sz = noAfterWords;
+      sendSignal(trigPtr->m_receiverRef, GSN_FIRE_TRIG_ORD,
+                 signal, FireTrigOrd::SignalLengthSuma, JBB, ptr, 3);
     }
     break;
-  case(ZUPDATE):
+  case (TriggerType::SUBSCRIPTION):
     jam();
-    if (trigPtr->sendBeforeValues) {
+    // Since only backup uses subscription triggers we
+    // send to backup directly for now
+    fireTrigOrd->setGCI(req_struct->gci_hi);
+
+    if (executeDirect)
+    {
+      jam();
+      EXECUTE_DIRECT(refToMain(trigPtr->m_receiverRef),
+                     GSN_FIRE_TRIG_ORD,
+                     signal,
+                     FireTrigOrd::SignalWithGCILength);
+      jamEntry();
+    }
+    else
+    {
       jam();
-      trigAttrInfo->setAttrInfoType(TrigAttrInfo::BEFORE_VALUES);
-      sendTrigAttrInfo(signal, beforeBuffer, noBeforeWords, executeDirect,ref);
+      // Todo send onlu before/after depending on BACKUP REDO/UNDO
+      ndbassert(longsignal);
+      LinearSectionPtr ptr[3];
+      ptr[0].p = keyBuffer;
+      ptr[0].sz = noPrimKey;
+      ptr[1].p = beforeBuffer;
+      ptr[1].sz = noBeforeWords;
+      ptr[2].p = afterBuffer;
+      ptr[2].sz = noAfterWords;
+      sendSignal(trigPtr->m_receiverRef, GSN_FIRE_TRIG_ORD,
+                 signal, FireTrigOrd::SignalWithGCILength, JBB, ptr, 3);
     }
-    trigAttrInfo->setAttrInfoType(TrigAttrInfo::AFTER_VALUES);
-    sendTrigAttrInfo(signal, afterBuffer, noAfterWords, executeDirect, ref);
     break;
   default:
     ndbrequire(false);
+    break;
   }
-  sendFireTrigOrd(signal,
-                  req_struct,
-                  regOperPtr,
-                  trigPtr,
-		  regFragPtr.p->fragmentId,
-                  noPrimKey,
-                  noBeforeWords,
-                  noAfterWords);
 }
 
 Uint32 Dbtup::setAttrIds(Bitmask<MAXNROFATTRIBUTESINWORDS>& attributeMask, 
@@ -1245,78 +1353,6 @@ void Dbtup::sendFireTrigOrd(Signal* sign
                             Uint32 noBeforeValueWords, 
                             Uint32 noAfterValueWords)
 {
-  FireTrigOrd* const fireTrigOrd = (FireTrigOrd *)signal->getDataPtrSend();
-  
-  fireTrigOrd->setConnectionPtr(req_struct->TC_index);
-  fireTrigOrd->setTriggerId(trigPtr->triggerId);
-  fireTrigOrd->fragId= fragmentId;
-
-  switch(regOperPtr->op_struct.op_type) {
-  case(ZINSERT):
-    jam();
-    fireTrigOrd->m_triggerEvent = TriggerEvent::TE_INSERT;
-    break;
-  case(ZDELETE):
-    jam();
-    fireTrigOrd->m_triggerEvent = TriggerEvent::TE_DELETE;
-    break;
-  case(ZUPDATE):
-    jam();
-    fireTrigOrd->m_triggerEvent = TriggerEvent::TE_UPDATE;
-    break;
-  default:
-    ndbrequire(false);
-    break;
-  }
-
-  fireTrigOrd->setNoOfPrimaryKeyWords(noPrimKeyWords);
-  fireTrigOrd->setNoOfBeforeValueWords(noBeforeValueWords);
-  fireTrigOrd->setNoOfAfterValueWords(noAfterValueWords);
-
-  switch(trigPtr->triggerType) {
-  case (TriggerType::SECONDARY_INDEX):
-  case (TriggerType::REORG_TRIGGER):
-    jam();
-    fireTrigOrd->m_triggerType = trigPtr->triggerType;
-    fireTrigOrd->m_transId1 = req_struct->trans_id1;
-    fireTrigOrd->m_transId2 = req_struct->trans_id2;
-    sendSignal(req_struct->TC_ref, GSN_FIRE_TRIG_ORD, 
-               signal, FireTrigOrd::SignalLength, JBB);
-    break;
-  case (TriggerType::SUBSCRIPTION_BEFORE): // Only Suma
-    jam();
-    fireTrigOrd->setGCI(req_struct->gci_hi);
-    fireTrigOrd->setHashValue(req_struct->hash_value);
-    fireTrigOrd->m_any_value = regOperPtr->m_any_value;
-    fireTrigOrd->m_gci_lo = req_struct->gci_lo;
-    if (refToInstance(trigPtr->m_receiverRef) == instance())
-      EXECUTE_DIRECT(refToMain(trigPtr->m_receiverRef),
-                     GSN_FIRE_TRIG_ORD,
-                     signal,
-                     FireTrigOrd::SignalLengthSuma);
-    else
-      sendSignal(trigPtr->m_receiverRef, GSN_FIRE_TRIG_ORD,
-                 signal, FireTrigOrd::SignalLengthSuma, JBB);
-    break;
-  case (TriggerType::SUBSCRIPTION):
-    jam();
-    // Since only backup uses subscription triggers we 
-    // send to backup directly for now
-    fireTrigOrd->setGCI(req_struct->gci_hi);
-
-    if (refToInstance(trigPtr->m_receiverRef) == instance())
-      EXECUTE_DIRECT(refToMain(trigPtr->m_receiverRef),
-                     GSN_FIRE_TRIG_ORD,
-                     signal,
-                     FireTrigOrd::SignalWithGCILength);
-    else
-      sendSignal(trigPtr->m_receiverRef, GSN_FIRE_TRIG_ORD,
-                 signal, FireTrigOrd::SignalWithGCILength, JBB);
-    break;
-  default:
-    ndbrequire(false);
-    break;
-  }
 }
 
 /*

=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2008-10-08 14:23:32 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2008-10-09 19:17:11 +0000
@@ -244,9 +244,8 @@ Suma::execSTTOR(Signal* signal) {
   if(m_startphase == 3)
   {
     jam();
-    // wl4391_todo something
-    Uint32 instanceNo = !isNdbMtLqh() ? 0 : 1;
-    ndbrequire((m_tup = (Dbtup*)globalData.getBlock(DBTUP, instanceNo)) != 0);
+    void* ptr = m_ctx.m_mm.get_memroot();
+    c_page_pool.set((Buffer_page*)ptr, (Uint32)~0);
   }
 
   if(m_startphase == 5)
@@ -3753,7 +3752,6 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
 {
   jamEntry();
   DBUG_ENTER("Suma::execFIRE_TRIG_ORD");
-  ndbassert(signal->getNoOfSections() == 0);
   
   CRASH_INSERTION(13016);
   FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr();
@@ -3770,6 +3768,33 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
 
   ndbassert(gci > m_last_complete_gci);
 
+  if (signal->getNoOfSections())
+  {
+    jam();
+    ndbassert(isNdbMtLqh());
+    SectionHandle handle(this, signal);
+
+    ndbrequire(b_bufferLock == 0);
+    ndbrequire(f_bufferLock == 0);
+    f_bufferLock = trigId;
+    b_bufferLock = trigId;
+
+    SegmentedSectionPtr ptr;
+    handle.getSection(ptr, 0); // Keys
+    Uint32 sz = ptr.sz;
+    copy(f_buffer, ptr);
+
+    handle.getSection(ptr, 2); // After values
+    copy(f_buffer + sz, ptr);
+    f_trigBufferSize = sz + ptr.sz;
+
+
+    handle.getSection(ptr, 1); // Before values
+    copy(b_buffer, ptr);
+    releaseSections(handle);
+  }
+
+  jam();
   ndbrequire(f_bufferLock == trigId);
   /**
    * Reset f_bufferLock
@@ -3901,11 +3926,73 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* s
   jamEntry();
   ndbassert(signal->getNoOfSections() == 0);
 
-  bool drop = false;
   SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
   Uint32 gci_hi = rep->gci_hi;
   Uint32 gci_lo = rep->gci_lo;
   Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
+
+  if (isNdbMtLqh())
+  {
+
+#define SSPP 0
+
+    if (SSPP)
+      printf("execSUB_GCP_COMPLETE_REP(%u/%u)", gci_hi, gci_lo);
+    jam();
+    Uint32 min = m_min_gcp_rep_counter_index;
+    Uint32 sz = NDB_ARRAY_SIZE(m_gcp_rep_counter);
+    for (Uint32 i = min; i != m_max_gcp_rep_counter_index; i = (i + 1) % sz)
+    {
+      jam();
+      if (m_gcp_rep_counter[i].m_gci == gci)
+      {
+        jam();
+        m_gcp_rep_counter[i].m_cnt ++;
+        if (m_gcp_rep_counter[i].m_cnt == m_gcp_rep_cnt)
+        {
+          jam();
+          /**
+           * Release this entry...
+           */
+          if (i != min)
+          {
+            jam();
+            m_gcp_rep_counter[i] = m_gcp_rep_counter[min];
+          }
+          m_min_gcp_rep_counter_index = (min + 1) % sz;
+          if (SSPP)
+            ndbout_c(" found - complete after: (min: %u max: %u)",
+                     m_min_gcp_rep_counter_index,
+                     m_max_gcp_rep_counter_index);
+          goto found;
+        }
+        else
+        {
+          jam();
+          if (SSPP)
+            ndbout_c(" found - wait unchanged: (min: %u max: %u)",
+                     m_min_gcp_rep_counter_index,
+                     m_max_gcp_rep_counter_index);
+          return; // Wait for more...
+        }
+      }
+    }
+    /**
+     * Not found...
+     */
+    Uint32 next = (m_max_gcp_rep_counter_index + 1) % sz;
+    ndbrequire(next != min); // ring buffer full
+    m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_gci = gci;
+    m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_cnt = 1;
+    m_max_gcp_rep_counter_index = next;
+    if (SSPP)
+      ndbout_c(" new - after: (min: %u max: %u)",
+               m_min_gcp_rep_counter_index,
+               m_max_gcp_rep_counter_index);
+    return;
+  }
+found:
+  bool drop = false;
   Uint32 flags = (m_missing_data)
                  ? rep->flags | SubGcpCompleteRep::MISSING_DATA
                  : rep->flags;
@@ -3980,8 +4067,7 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* s
 	  Page_pos pos= bucket->m_buffer_head;
 	  ndbrequire(pos.m_max_gci < gci);
 
-	  Buffer_page* page= (Buffer_page*)
-	    m_tup->c_page_pool.getPtr(pos.m_page_id);
+	  Buffer_page* page= c_page_pool.getPtr(pos.m_page_id);
 	  ndbout_c("takeover %d", pos.m_page_id);
 	  page->m_max_gci_hi = (Uint32)(pos.m_max_gci >> 32);
           page->m_max_gci_lo = (Uint32)(pos.m_max_gci & 0xFFFFFFFF);
@@ -5058,7 +5144,7 @@ Suma::get_buffer_ptr(Signal* signal, Uin
   
   if (likely(pos.m_page_id != RNIL))
   {
-    page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id);
+    page= c_page_pool.getPtr(pos.m_page_id);
     ptr= page->m_data + pos.m_page_pos;
   }
 
@@ -5121,7 +5207,7 @@ loop:
     pos.m_page_pos = sz;
     pos.m_last_gci = gci;
     
-    page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id);
+    page= c_page_pool.getPtr(pos.m_page_id);
     page->m_next_page= RNIL;
     ptr= page->m_data;
     goto loop; //
@@ -5150,7 +5236,7 @@ Suma::out_of_buffer_release(Signal* sign
   
   if(tail != RNIL)
   {
-    Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
+    Buffer_page* page= c_page_pool.getPtr(tail);
     bucket->m_buffer_tail = page->m_next_page;
     free_page(tail, page);
     signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
@@ -5202,8 +5288,8 @@ loop:
   Uint32 ref= m_first_free_page;
   if(likely(ref != RNIL))
   {
-    m_first_free_page = ((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_next_page;
-    Uint32 chunk = ((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_page_chunk_ptr_i;
+    m_first_free_page = (c_page_pool.getPtr(ref))->m_next_page;
+    Uint32 chunk = (c_page_pool.getPtr(ref))->m_page_chunk_ptr_i;
     c_page_chunk_pool.getPtr(ptr, chunk);
     ndbassert(ptr.p->m_free);
     ptr.p->m_free--;
@@ -5213,8 +5299,8 @@ loop:
   if(!c_page_chunk_pool.seize(ptr))
     return RNIL;
 
-  Uint32 count;
-  m_tup->allocConsPages(16, count, ref);
+  Uint32 count = 16;
+  m_ctx.m_mm.alloc_pages(RT_DBTUP_PAGE, &ref, &count, 1);
   if (count == 0)
     return RNIL;
 
@@ -5228,7 +5314,7 @@ loop:
   LINT_INIT(page);
   for(Uint32 i = 0; i<count; i++)
   {
-    page = (Buffer_page*)m_tup->c_page_pool.getPtr(ref);
+    page = c_page_pool.getPtr(ref);
     page->m_page_state= SUMA_SEQUENCE;
     page->m_page_chunk_ptr_i = ptr.i;
     page->m_next_page = ++ref;
@@ -5308,7 +5394,7 @@ Suma::release_gci(Signal* signal, Uint32
   else
   {
     jam();
-    Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
+    Buffer_page* page= c_page_pool.getPtr(tail);
     Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32);
     Uint32 next_page = page->m_next_page;
 
@@ -5419,7 +5505,7 @@ Suma::resend_bucket(Signal* signal, Uint
   Bucket* bucket= c_buckets+buck;
   Uint32 tail= bucket->m_buffer_tail;
 
-  Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
+  Buffer_page* page= c_page_pool.getPtr(tail);
   Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32);
   Uint32 next_page = page->m_next_page;
   Uint32 *ptr = page->m_data + pos;

=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.hpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2008-10-08 14:24:20 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2008-10-09 19:17:11 +0000
@@ -597,7 +597,6 @@ private:
   Bucket_mask m_active_buckets;
   Bucket_mask m_switchover_buckets;  
   
-  class Dbtup* m_tup;
   void init_buffers();
   Uint32* get_buffer_ptr(Signal*, Uint32 buck, Uint64 gci, Uint32 sz);
   Uint32 seize_page();
@@ -649,11 +648,23 @@ private:
 
   Uint32 m_first_free_page;
   ArrayPool<Page_chunk> c_page_chunk_pool;
+  ArrayPool<Buffer_page> c_page_pool;
 
 #ifdef VM_TRACE
   Uint64 m_gcp_monitor;
 #endif
 
+  struct SubGcpCompleteCounter
+  {
+    Uint64 m_gci;
+    Uint32 m_cnt;
+  };
+
+  Uint32 m_gcp_rep_cnt;
+  Uint32 m_min_gcp_rep_counter_index;
+  Uint32 m_max_gcp_rep_counter_index;
+  struct SubGcpCompleteCounter m_gcp_rep_counter[10];
+
   /* Buffer used in Suma::execALTER_TAB_REQ(). */
   Uint32 b_dti_buf[MAX_WORDS_META_FILE];
   Uint64 m_current_gci;

=== modified file 'storage/ndb/src/kernel/blocks/suma/SumaInit.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp	2008-10-05 07:14:21 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp	2008-10-09 19:17:11 +0000
@@ -136,10 +136,16 @@ Suma::Suma(Block_context& ctx) :
 #endif
   m_missing_data = false;
   bzero(c_subscriber_per_node, sizeof(c_subscriber_per_node));
+
+  m_gcp_rep_cnt = getLqhWorkers();
+  m_min_gcp_rep_counter_index = 0;
+  m_max_gcp_rep_counter_index = 0;
+  bzero(m_gcp_rep_counter, sizeof(m_gcp_rep_counter));
 }
 
 Suma::~Suma()
 {
+  c_page_pool.clear();
 }
 
 BLOCK_FUNCTIONS(Suma)

=== modified file 'storage/ndb/src/kernel/vm/VMSignal.hpp'
--- a/storage/ndb/src/kernel/vm/VMSignal.hpp	2007-12-23 12:52:25 +0000
+++ b/storage/ndb/src/kernel/vm/VMSignal.hpp	2008-10-10 11:39:51 +0000
@@ -213,7 +213,8 @@ NodeReceiverGroup::operator=(BlockRefere
 
 inline
 SectionHandle::SectionHandle(SimulatedBlock* b)
-  : m_block(b)
+  : m_cnt(0), 
+    m_block(b)
 {
 }
 

=== modified file 'storage/ndb/src/kernel/vm/mt-asm.h'
--- a/storage/ndb/src/kernel/vm/mt-asm.h	2008-10-08 19:43:24 +0000
+++ b/storage/ndb/src/kernel/vm/mt-asm.h	2008-10-11 09:30:51 +0000
@@ -21,7 +21,10 @@
 #define NDB_MT_ASM_H
 
 #if defined(__GNUC__)
-#if defined(__x86_64__)
+
+/** gcc */
+
+#if defined(__x86_64__) || defined (__i386__)
 /* Memory barriers, these definitions are for x64_64. */
 #define mb()    asm volatile("mfence":::"memory")
 /* According to Intel docs, it does not reorder loads. */
@@ -47,7 +50,7 @@ cpu_pause()
 }
 
 #elif defined(__sparc__)
-#define mb()    asm volatile("membar #LoadLoad | #LoadStore | #StoreLoad | #StoreStore":::"memory)
+#define mb()    asm volatile("membar #LoadLoad | #LoadStore | #StoreLoad | #StoreStore":::"memory")
 #define rmb()   asm volatile("membar #LoadLoad" ::: "memory")
 #define wmb()   asm volatile("membar #StoreStore" ::: "memory")
 #define read_barrier_depends()  do {} while(0)
@@ -56,8 +59,31 @@ cpu_pause()
 extern  int xcng(volatile unsigned * addr, int val);
 extern void cpu_pause();
 
-#elif
-#error "Unsupported architecture"
+#else
+#error "Unsupported architecture (gcc)"
+#endif
+
+#elif defined(__sun)
+
+/** sun studio */
+/**
+ * TODO check that asm ("") implies a compiler barrier
+ *      i.e that it clobbers memory
+ */
+#if defined(__x86_64__)
+#define mb()    asm ("mfence")
+/* According to Intel docs, it does not reorder loads. */
+//#define rmb() asm ("lfence")
+#define rmb()   asm ("")
+#define wmb()   asm ("")
+#define read_barrier_depends()  do {} while(0)
+#elif defined(__sparc)
+#define mb() asm ("membar #LoadLoad | #LoadStore | #StoreLoad | #StoreStore")
+#define rmb() asm ("membar #LoadLoad")
+#define wmb() asm ("membar #StoreStore")
+#define read_barrier_depends()  do {} while(0)
+#else
+#error "Unsupported architecture (sun studio)"
 #endif
 #else
 #error "Unsupported compiler"

=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2008-10-08 14:34:13 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2008-10-10 09:32:12 +0000
@@ -3264,10 +3264,7 @@ loop:
     tmpAttr.AttributeExtPrecision = ((unsigned)col->m_precision & 0xFFFF);
     tmpAttr.AttributeExtScale = col->m_scale;
     tmpAttr.AttributeExtLength = col->m_length;
-    if(col->m_storageType == NDB_STORAGETYPE_DISK)
-      tmpAttr.AttributeArrayType = NDB_ARRAYTYPE_FIXED;
-    else
-      tmpAttr.AttributeArrayType = col->m_arrayType;
+    tmpAttr.AttributeArrayType = col->m_arrayType;
 
     if(col->m_pk)
       tmpAttr.AttributeStorageType = NDB_STORAGETYPE_MEMORY;      

=== modified file 'storage/ndb/test/src/NDBT_Tables.cpp'
--- a/storage/ndb/test/src/NDBT_Tables.cpp	2008-08-21 06:36:55 +0000
+++ b/storage/ndb/test/src/NDBT_Tables.cpp	2008-10-10 09:32:12 +0000
@@ -74,15 +74,15 @@ NDBT_Attribute T3Attribs[] = {
   NDBT_Attribute("KOL04", NdbDictionary::Column::Char, 100),
   NDBT_Attribute("KOL05", NdbDictionary::Column::Varbinary, 100),
   NDBT_Attribute("KOL06", NdbDictionary::Column::Char, 100),
-  NDBT_Attribute("KOL07", NdbDictionary::Column::Varbinary, 100),
+  NDBT_Attribute("KOL07", NdbDictionary::Column::Varbinary, 25),
   NDBT_Attribute("KOL08", NdbDictionary::Column::Char, 100),
-  NDBT_Attribute("KOL09", NdbDictionary::Column::Varbinary, 100),
+  NDBT_Attribute("KOL09", NdbDictionary::Column::Varbinary, 25),
   NDBT_Attribute("KOL10", NdbDictionary::Column::Char, 100),
-  NDBT_Attribute("KOL11", NdbDictionary::Column::Varbinary, 100),
+  NDBT_Attribute("KOL11", NdbDictionary::Column::Varbinary, 25),
   NDBT_Attribute("KOL12", NdbDictionary::Column::Char, 100),
-  NDBT_Attribute("KOL13", NdbDictionary::Column::Varbinary, 100),
+  NDBT_Attribute("KOL13", NdbDictionary::Column::Varbinary, 25),
   NDBT_Attribute("KOL14", NdbDictionary::Column::Char, 100),
-  NDBT_Attribute("KOL15", NdbDictionary::Column::Varbinary, 100),
+  NDBT_Attribute("KOL15", NdbDictionary::Column::Longvarbinary, 537),
   NDBT_Attribute("KOL16", NdbDictionary::Column::Char, 100),
   NDBT_Attribute("KOL17", NdbDictionary::Column::Varbinary, 100),
   NDBT_Attribute("KOL18", NdbDictionary::Column::Char, 100),
@@ -397,6 +397,7 @@ NDBT_Attribute D1Attribs[] = {
   NDBT_Attribute("KOL3", NdbDictionary::Column::Unsigned),
   NDBT_Attribute("KOL4", NdbDictionary::Column::Char, 233, false, true, 0, NdbDictionary::Column::StorageTypeDisk),
   NDBT_Attribute("KOL5", NdbDictionary::Column::Unsigned),
+  NDBT_Attribute("KOL6", NdbDictionary::Column::Varbinary, 233, false, true, 0, NdbDictionary::Column::StorageTypeDisk),
 };
 static
 const
@@ -411,6 +412,7 @@ NDBT_Attribute D2Attribs[] = {
   NDBT_Attribute("KOL4", NdbDictionary::Column::Varbinary, 133, false, true, 0, MM, true),
   NDBT_Attribute("KOL5", NdbDictionary::Column::Char, 199, false, true, 0, NdbDictionary::Column::StorageTypeDisk),
   NDBT_Attribute("KOL6", NdbDictionary::Column::Bit, 21, false, false, 0, NdbDictionary::Column::StorageTypeDisk),
+  NDBT_Attribute("KOL7", NdbDictionary::Column::Longvarbinary, 384, false, true, 0, NdbDictionary::Column::StorageTypeDisk),
 };
 static
 const
@@ -757,9 +759,9 @@ NDBT_Attribute WIDE_2COL_ATTRIBS[] = {
   /* Note that we can't have any index on this table as it
    * has no space for the extra FRAGID the index requires!
    */
-  NDBT_Attribute("KEY", NdbDictionary::Column::Longvarchar, 
+  NDBT_Attribute("KEY", NdbDictionary::Column::Longvarbinary, 
                  NDBT_Tables::MaxVarTypeKeyBytes, true), 
-  NDBT_Attribute("ATTR", NdbDictionary::Column::Longvarchar, 
+  NDBT_Attribute("ATTR", NdbDictionary::Column::Longvarbinary, 
                  NDBT_Tables::MaxKeyMaxVarTypeAttrBytes, false)
 };
 
@@ -776,9 +778,9 @@ NDBT_Table WIDE_2COL("WIDE_2COL", sizeof
 static 
 const
 NDBT_Attribute WIDE_2COL_IX_ATTRIBS[] = {
-  NDBT_Attribute("KEY", NdbDictionary::Column::Longvarchar, 
+  NDBT_Attribute("KEY", NdbDictionary::Column::Longvarbinary,
                  NDBT_Tables::MaxVarTypeKeyBytes, true), 
-  NDBT_Attribute("ATTR", NdbDictionary::Column::Longvarchar, 
+  NDBT_Attribute("ATTR", NdbDictionary::Column::Longvarbinary,
                  NDBT_Tables::MaxKeyMaxVarTypeAttrBytesIndex , false)
 };
 
@@ -802,9 +804,9 @@ NDBT_Attribute WIDE_MAXKEY_HUGO_ATTRIBS[
   /* Note that we can't have any index on this table as it
    * has no space for the extra FRAGID the index requires!
    */
-  NDBT_Attribute("KEY", NdbDictionary::Column::Longvarchar, 
+  NDBT_Attribute("KEY", NdbDictionary::Column::Longvarbinary,
                  NDBT_Tables::MaxVarTypeKeyBytes, true), 
-  NDBT_Attribute("ATTR", NdbDictionary::Column::Longvarchar, 
+  NDBT_Attribute("ATTR", NdbDictionary::Column::Longvarbinary,
                  NDBT_Tables::MaxKeyMaxVarTypeAttrBytes -
                  NDBT_Tables::HugoOverheadBytes, false),
   NDBT_Attribute("HUGOID", NdbDictionary::Column::Unsigned,
@@ -827,7 +829,7 @@ const
 NDBT_Attribute WIDE_MAXATTR_HUGO_ATTRIBS[] = {
   NDBT_Attribute("KEY", NdbDictionary::Column::Unsigned,
                  1, true), 
-  NDBT_Attribute("ATTR", NdbDictionary::Column::Longvarchar, 
+  NDBT_Attribute("ATTR", NdbDictionary::Column::Longvarbinary,
                  NDBT_Tables::MinKeyMaxVarTypeAttrBytes -
                  NDBT_Tables::HugoOverheadBytes, false),
   NDBT_Attribute("HUGOID", NdbDictionary::Column::Unsigned,

Thread
bzr push into mysql-5.1 branch (stewart:3000 to 3001) Stewart Smith13 Oct