List:Commits« Previous MessageNext Message »
From:magnus.blaudd Date:November 12 2012 3:04pm
Subject:bzr push into mysql-trunk-cluster branch (magnus.blaudd:3567 to 3568)
View as plain text  
 3568 magnus.blaudd@stripped	2012-11-12 [merge]
      Merge 7.3 -> trunk-cluster

    modified:
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster_glue.h
      storage/ndb/cmake/os/Windows.cmake
      storage/ndb/compile-cluster
      storage/ndb/include/kernel/signaldata/LCP.hpp
      storage/ndb/include/mgmapi/mgmapi.h
      storage/ndb/memcache/include/int3korr.h
      storage/ndb/src/common/debugger/signaldata/LCP.cpp
      storage/ndb/src/kernel/blocks/backup/Backup.cpp
      storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/mgmsrv/testConfig.cpp
      storage/ndb/test/ndbapi/testInterpreter.cpp
 3567 magnus.blaudd@stripped	2012-11-12
      ndb
        - fix unintentional(mystery) revert of some disabled.def files for ndb

    modified:
      mysql-test/suite/ndb/t/disabled.def
      mysql-test/suite/ndb_memcache/t/disabled.def
      mysql-test/suite/rpl_ndb/t/disabled.def
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2012-11-06 14:16:49 +0000
+++ b/sql/ha_ndbcluster.cc	2012-11-12 15:03:21 +0000
@@ -8982,8 +8982,7 @@ static int create_ndb_column(THD *thd,
   }
   else
     col.setAutoIncrement(FALSE);
- 
-#ifndef NDB_WITHOUT_COLUMN_FORMAT
+
   DBUG_PRINT("info", ("storage: %u  format: %u  ",
                       field->field_storage_type(),
                       field->column_format()));
@@ -9018,7 +9017,6 @@ static int create_ndb_column(THD *thd,
       dynamic= (create_info->row_type == ROW_TYPE_DYNAMIC);
     break;
   }
-#endif
   DBUG_PRINT("info", ("Column %s is declared %s", field->field_name,
                       (dynamic) ? "dynamic" : "static"));
   if (type == NDBCOL::StorageTypeDisk)
@@ -9030,7 +9028,6 @@ static int create_ndb_column(THD *thd,
       dynamic= false;
     }
 
-#ifndef NDB_WITHOUT_COLUMN_FORMAT
     if (thd && field->column_format() == COLUMN_FORMAT_TYPE_DYNAMIC)
     {
       push_warning_printf(thd, Sql_condition::SL_WARNING,
@@ -9040,7 +9037,6 @@ static int create_ndb_column(THD *thd,
                           "column will become FIXED",
                           field->field_name);
     }
-#endif
   }
 
   switch (create_info->row_type) {
@@ -9537,7 +9533,6 @@ int ha_ndbcluster::create(const char *na
     KEY_PART_INFO *end= key_part + key_info->user_defined_key_parts;
     for (; key_part != end; key_part++)
     {
-#ifndef NDB_WITHOUT_COLUMN_FORMAT
       if (key_part->field->field_storage_type() == HA_SM_DISK)
       {
         push_warning_printf(thd, Sql_condition::SL_WARNING,
@@ -9550,7 +9545,6 @@ int ha_ndbcluster::create(const char *na
         result= HA_ERR_UNSUPPORTED;
         goto abort_return;
       }
-#endif
       tab.getColumn(key_part->fieldnr-1)->setStorageType(
                              NdbDictionary::Column::StorageTypeMemory);
     }
@@ -9990,7 +9984,6 @@ int ha_ndbcluster::create_ndb_index(THD
   for (; key_part != end; key_part++) 
   {
     Field *field= key_part->field;
-#ifndef NDB_WITHOUT_COLUMN_FORMAT
     if (field->field_storage_type() == HA_SM_DISK)
     {
       push_warning_printf(thd, Sql_condition::SL_WARNING,
@@ -10002,7 +9995,6 @@ int ha_ndbcluster::create_ndb_index(THD
                           "STORAGE DISK is not supported");
       DBUG_RETURN(HA_ERR_UNSUPPORTED);
     }
-#endif
     DBUG_PRINT("info", ("attr: %s", field->field_name));
     if (ndb_index.addColumnName(field->field_name))
     {

=== modified file 'sql/ha_ndbcluster_glue.h'
--- a/sql/ha_ndbcluster_glue.h	2012-11-01 11:46:09 +0000
+++ b/sql/ha_ndbcluster_glue.h	2012-11-12 15:03:21 +0000
@@ -77,19 +77,12 @@ bool close_cached_tables(THD *thd, TABLE
 
 #endif
 
-
-extern ulong opt_server_id_mask;
-
 static inline
 uint32 thd_unmasked_server_id(const THD* thd)
 {
-#ifndef NDB_WITHOUT_SERVER_ID_BITS
   const uint32 unmasked_server_id = thd->unmasked_server_id;
   assert(thd->server_id == (thd->unmasked_server_id & opt_server_id_mask));
   return unmasked_server_id;
-#else
-  return thd->server_id;
-#endif
 }
 
 

=== modified file 'storage/ndb/cmake/os/Windows.cmake'
--- a/storage/ndb/cmake/os/Windows.cmake	2011-05-24 08:45:38 +0000
+++ b/storage/ndb/cmake/os/Windows.cmake	2012-11-12 10:58:00 +0000
@@ -21,3 +21,9 @@
 GET_FILENAME_COMPONENT(_SCRIPT_DIR ${CMAKE_CURRENT_LIST_FILE} PATH)
 INCLUDE(${_SCRIPT_DIR}/WindowsCache.cmake)
 
+IF(MSVC)
+  # Enable "Full Path of Source Code File in Diagnostics" to avoid
+  # "guessing" which file was causing warnings or error
+  SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /FC")
+  SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /FC")
+ENDIF()

=== modified file 'storage/ndb/compile-cluster'
--- a/storage/ndb/compile-cluster	2012-11-06 14:08:41 +0000
+++ b/storage/ndb/compile-cluster	2012-11-12 14:57:58 +0000
@@ -1,6 +1,6 @@
 #!/usr/bin/perl
 
-# Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -35,6 +35,8 @@ my $opt_build_type;
 my $opt_build = 1;
 my $opt_just_print;
 my $opt_vanilla;
+my $opt_autotest;
+my $opt_parse_log;
 Getopt::Long::Configure("pass_through");
 GetOptions(
 
@@ -46,6 +48,12 @@ GetOptions(
   'c|just-configure' => sub { $opt_build = 0; },
   'n|just-print' => \$opt_just_print,
   'vanilla' => \$opt_vanilla,
+
+  'autotest' => \$opt_autotest,
+
+  # Special switch --parse-log=<file> which reads a log file (from build) and
+  # parses it for warnings
+  'parse-log=s' => \$opt_parse_log,
 ) or exit(1);
 
 # Find source root directory, assume this script is
@@ -59,6 +67,23 @@ if ($^O eq "cygwin") {
   chomp $opt_srcdir;
 }
 
+# Parse given log file for warnings
+if ($opt_parse_log)
+{
+  use IO::File;
+  my $file = IO::File->new($opt_parse_log, 'r')
+      or die "Failed to open file $opt_parse_log: $!";
+  my $parser = WarningParser->new(srcdir => $opt_srcdir,
+				  unified => 1,
+                                  verbose => 1);
+  while (my $line = <$file>)
+  {
+    $parser->parse_line($line);
+  }
+  $parser->report($0);
+  exit(0);
+}
+
 # Check that cmake exists and figure out it's version 
 my $cmake_version_id;
 {
@@ -124,6 +149,12 @@ if(defined $ENV{"CXXFLAGS"} and $ENV{"CX
     push(@args, "-DWITH_NDB_TEST=1");
   }
 
+  if ($opt_autotest)
+  {
+    print("compile-cluster: autotest build requested, extra everything\n");
+    push(@args, "-DWITH_NDB_CCFLAGS='-DERROR_INSERT'");
+  }
+
   # The cmake generator to use
   if ($opt_build_type)
   {
@@ -344,7 +375,7 @@ if (!$opt_build)
       push(@args, $config);
     }
 
-    cmd("cmake", @args);
+    build_cmd("cmake", @args);
 
   }
   else
@@ -353,22 +384,335 @@ if (!$opt_build)
     die "You need to install cmake with version > 2.8"
       if ($^O eq "cygwin" or $^O eq "MSWin32");
 
-    cmd("make");
+    build_cmd("make");
   }
 }
 
+exit(0);
+
+
 sub cmd {
   my ($cmd, @a)= @_;
-  
-  if ($opt_just_print){
-    print "$cmd ", join(' ', @a), "\n";
-    return; 
-  }
-  
-  print "compile-cluster: calling '$cmd ", join(' ', @a), "'\n";
+  my $cmd_str = join(' ', $cmd, @a);
+  print "compile-cluster: '$cmd_str'\n";
+  return if ($opt_just_print);
   system($cmd, @a)
-    and print("command failed: $!\n")
-      and exit(1);
+    and print("command '$cmd_str' failed\n")
+	  and exit(1);
 }
 
-exit(0);
+use IPC::Open2;
+sub build_cmd {
+  my ($cmd, @args) = @_;
+  my $cmd_str = join(' ', $cmd, @args);
+  print "compile-cluster: '$cmd_str'\n";
+  return if ($opt_just_print);
+  $cmd_str.= " 2>&1";
+  my ($chld_out, $chld_in);
+  my $pid = open2($chld_out, $chld_in, $cmd_str) or die $!;
+  # Create warning parser and pass every ouput line through it
+  my $parser = WarningParser->new(srcdir => $opt_srcdir,
+				  unified => 1,
+                                  verbose => 1);
+  while (my $line = <$chld_out>)
+  {
+    if (!$parser->parse_line($line))
+    {
+      # Warning parser didn't print the line, print it
+      print $line;
+    }
+  }
+  waitpid($pid, 0);
+  my $exit_status = $?;
+  my $exit_code = ($exit_status >> 8);
+  print "Build completed with exit_code: $exit_code(status: $exit_status)\n";
+  if ($exit_code)
+  {
+    print("command '$cmd_str' failed: $!\n");
+    exit(1);
+  }
+  $parser->report($0);
+}
+
+
+# Perl class used by WarningParser for keeping
+# track of one individual warning
+#
+package WarningParser::Warning;
+use strict;
+
+sub new {
+  my ($class, $file, $line, $text)= @_;
+  my $self= bless {
+    FILE => $file,
+    LINE => $line,
+    TEXT => $text,
+  }, $class;
+  return $self;
+}
+
+sub file {
+  my ($self) = @_;
+  return $self->{FILE};
+}
+
+sub line {
+  my ($self) = @_;
+  return $self->{LINE};
+}
+
+sub text {
+  my ($self) = @_;
+  return $self->{TEXT};
+}
+
+# Print the warning in verbose format for easier debugging
+sub print_verbose {
+  my ($self) = @_;
+
+  print "{\n";
+  foreach my $key (keys %$self)
+  {
+    print "  $key => '$self->{$key}'\n";
+  }
+  print "}\n";
+}
+
+# Print the warning in unified format(easy for automated build system to parse)
+# emulate gcc
+sub print_unified {
+  my ($self) = @_;
+  my $file = $self->file();
+  my $line = $self->line();
+  my $text = $self->text();
+  print "$file:$line: warning: $text\n";
+}
+
+sub suppress {
+  my ($self, $message) = @_;
+  die if exists $self->{SUPPRESSED}; # Already suppressed
+  die unless $message; # No message
+  $self->{SUPPRESSED} = $message;
+}
+
+sub is_suppressed {
+  my ($self) = @_;
+  return exists $self->{SUPPRESSED};
+}
+
+sub is_cluster_warning {
+  my ($self) = @_;
+  my $file = $self->{FILE};
+  # Have the string ndb in the file name(including
+  # directory so everything below storage/ndb is
+  # automatically included)
+  if ($file =~ /ndb/)
+  {
+    return 1;
+  }
+  return 0;
+}
+
+
+package WarningParser;
+use strict;
+use Cwd 'abs_path';
+
+sub new {
+  my $class= shift;
+  my %opts= ( @_ );
+  my $srcdir = $opts{srcdir} || die "Must supply srcdir";
+  my $verbose = $opts{verbose} || 0;
+  my $unified = $opts{unified} || 0;
+  my $track_dirs = $opts{track_dirs} || 0;
+
+  my $self= bless {
+    # empty array of warnings
+    WARNINGS => [],
+
+    # print each warning object as they are accumulated
+    VERBOSE => $verbose,
+
+    # print warnings in unified format(i.e the format
+    # is converted to look like standard gcc). This makes it
+    # easy for higher level tools to parse the warnings
+    # regardless of compiler.
+    UNIFIED => $unified,
+
+    # Need to keep track of current dir since file name in
+    # warnings does not include directory(this is normal
+    # in makefiles generated by automake)
+    TRACK_DIRS => $track_dirs,
+
+    # Location of source
+    SRCDIR => $srcdir,
+
+  }, $class;
+  return $self;
+}
+
+sub new_warning {
+  my ($self, $file, $line, $text) = @_;
+  if ($self->{TRACK_DIRS})
+  {
+    # file does not contain directory, add currently
+    # tracked dir
+    my $dir = $self->{DIR};
+    $file= "$dir/$file";
+  }
+  my $srcdir = $self->{SRCDIR};
+  # srcdir is in abs_path form, convert also file to abs_path
+  $file = abs_path($file);
+  $file =~ s/^$srcdir//; # Remove leading srcdir
+  $file =~ s:^\/::; # Remove leading slash
+
+  return WarningParser::Warning->new($file, $line, $text);
+}
+
+sub parse_warning {
+  my ($self, $line) = @_;
+
+  if ($self->{TRACK_DIRS})
+  {
+    # Track current directory by parsing makes
+    # "Entering/Leaving directory" messages
+    if ($line =~ /Entering directory \`(.*)\'/)
+    {
+      my $dir= $1;
+      # Push previous dir onto stack before setting new
+      push(@{$self->{DIRSTACK}}, $self->{DIR});
+      $self->{DIR}= $dir;
+    }
+
+    if ($line =~ /Leaving directory \`(.*)\'/)
+    {
+      # Pop previous dir from stack and set it as current
+      my $prevdir= pop(@{$self->{DIRSTACK}});
+      $self->{DIR}= $prevdir;
+    }
+  }
+
+  # cmake and Visual Studio 10(seems to use msbuild)
+  if ($line =~ /^\s*(.*)\((\d+)\): warning ([^ ]*:.*)$/)
+  {
+    return $self->new_warning($1, $2, $3);
+  }
+
+  # cmake and Visual Studio 9
+  if ($line =~ /^(\d+>)?(?:[a-z]:)?([^:()]*)\((\d+)\) : warning ([^ ]*:.*)$/)
+  {
+    my ($project, $file, $lineno, $text) = ($1, $2, $3, $4);
+    return $self->new_warning($file, $lineno, $text);
+  }
+
+  # cmake and gcc with line number AND column
+  if ($line =~ /([^ ]+\.(c|h|cc|cpp|hpp|ic|i|y|l)):([0-9]+):([0-9]+):[ \t]*warning:[ \t]*(.*)$/)
+  {
+    my ($file, $junk, $lineno, $colno, $text) = ($1, $2, $3, $4, $5);
+    return $self->new_warning($file, $lineno, $text);
+  }
+
+  # cmake and gcc
+  if ($line =~ /([^ ]+\.(c|h|cc|cpp|hpp|ic|i|y|l)):[ \t]*([0-9]+):[ \t]*warning:[ \t]*(.*)$/)
+  {
+    return $self->new_warning($1, $3, $4);
+  }
+
+  return undef;
+}
+
+sub suppress_warning {
+  my ($self, $w) = @_;
+
+  # Ignore files not owned by cluster team
+  if (!$w->is_cluster_warning())
+  {
+    $w->suppress('Warning in file not owned by cluster team');
+    return 1;
+  }
+
+  # List of supressions consisting of one regex for the dir+file name
+  # and one for the warning text. The suppression is stored as a
+  # list of arrays, where each array contains two precompiled
+  # regexes. If both expressions match, the warning is suppressed.
+  my @suppressions = (
+    # [ qr/<dirname+filename regex>/, qr/<warning regex>/ ],
+  );
+
+  foreach my $sup ( @suppressions )
+  {
+    my $file_pat = $sup->[0];
+    my $text_pat = $sup->[1];
+    if ($w->file() =~ /$file_pat/ and
+	$w->text() =~ /$text_pat/)
+    {
+      $w->suppress("Suppressed by file suppression: '$file_pat, $text_pat'");
+      return 1;
+    }
+  }
+
+  return 0;
+}
+
+# Parse a line for warnings and return 1 if warning was
+# found(even if it was suppressed)
+#
+sub parse_line {
+  my ($self, $line) = @_;
+  $self->{LINES}++;
+
+  # Remove trailing line feed and new line
+  $line =~ s/[\r]+$//g;
+  $line =~ s/[\n]+$//g;
+
+  my $w = $self->parse_warning($line);
+  if (defined $w)
+  {
+    if (!$self->suppress_warning($w))
+    {
+      if ($self->{UNIFIED})
+      {
+        # Print the warning in UNIFIED format
+        $w->print_unified();
+      }
+      else
+      {
+        # Just echo the line verbatim
+	print "\n$line\n";
+      }
+    }
+    # Print the warning object in verbose mode
+    $w->print_verbose() if $self->{VERBOSE};
+
+    # Save the warning for final report
+    push(@{$self->{WARNINGS}}, $w);
+
+    return 1;
+  }
+
+  return 0;
+}
+
+sub report {
+  my ($self, $prefix) = @_;
+  my $lines = $self->{LINES};
+
+  my $warnings = 0;
+  my $suppressed= 0;
+
+  foreach my $w (@{$self->{WARNINGS}})
+  {
+    if ($w->is_suppressed())
+    {
+      $suppressed++;
+    }
+    else
+    {
+      $warnings++;
+    }
+  }
+  my $total = $warnings + $suppressed;
+  print "$prefix: $warnings warnings found(suppressed $suppressed of total $total)\n";
+}
+
+1;

=== modified file 'storage/ndb/include/kernel/signaldata/LCP.hpp'
--- a/storage/ndb/include/kernel/signaldata/LCP.hpp	2012-05-21 23:05:17 +0000
+++ b/storage/ndb/include/kernel/signaldata/LCP.hpp	2012-11-07 18:00:23 +0000
@@ -246,7 +246,7 @@ public:
 
 private:
   Uint32 senderRef;
-  Uint32 reqData;
+  Uint32 senderData;
 };
 
 struct LcpStatusConf
@@ -278,7 +278,7 @@ public:
   };
 private:
   Uint32 senderRef;
-  Uint32 reqData;
+  Uint32 senderData;
   /* Backup stuff */
   Uint32 lcpState;
   /* In lcpState == LCP_IDLE, refers to prev LCP
@@ -289,11 +289,18 @@ private:
   Uint32 lcpDoneBytesHi;
   Uint32 lcpDoneBytesLo;
   
-  /* Backup stuff valid iff lcpState == LCP_SCANNING */
   Uint32 tableId;
   Uint32 fragId;
-  Uint32 replicaDoneRowsHi;
-  Uint32 replicaDoneRowsLo;
+  /* Backup stuff valid iff lcpState == LCP_SCANNING or
+   * LCP_SCANNED
+   * For LCP_SCANNING contains row count of rows scanned
+   *  (Increases as scan proceeds)
+   * For LCP_SCANNED contains bytes remaining to be flushed
+   * to file.
+   *  (Decreases as buffer drains to file)
+   */
+  Uint32 completionStateHi;
+  Uint32 completionStateLo;
 };
 
 struct LcpStatusRef
@@ -325,7 +332,7 @@ public:
 
 private:
   Uint32 senderRef;
-  Uint32 reqData;
+  Uint32 senderData;
   Uint32 error;
 };
 

=== modified file 'storage/ndb/include/mgmapi/mgmapi.h'
--- a/storage/ndb/include/mgmapi/mgmapi.h	2011-09-29 09:23:04 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi.h	2012-11-07 13:42:55 +0000
@@ -402,7 +402,11 @@ extern "C" {
   void ndb_mgm_destroy_handle(NdbMgmHandle * handle);
 
   /**
-   * Set a name of the handle.  Name is reported in cluster log.
+   * Set a name of the handle.
+   *
+   * NOTE! Name is reported in cluster log only when the
+   * handle subsequently is used to allocate a nodeid
+   * using ndb_mgm_alloc_nodeid().
    *
    * @param   handle        Management handle
    * @param   name          Name

=== modified file 'storage/ndb/memcache/include/int3korr.h'
--- a/storage/ndb/memcache/include/int3korr.h	2011-11-07 18:47:48 +0000
+++ b/storage/ndb/memcache/include/int3korr.h	2012-11-12 13:53:54 +0000
@@ -30,4 +30,4 @@
 #define uint3korr(A)  (Uint32) (((Uint32) ((Uint8) (A)[0])) +\
                                 (((Uint32) ((Uint8) (A)[1])) << 8) +\
                                 (((Uint32) ((Uint8) (A)[2])) << 16))
-                                
\ No newline at end of file
+

=== modified file 'storage/ndb/src/common/debugger/signaldata/LCP.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/LCP.cpp	2012-05-21 22:27:28 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/LCP.cpp	2012-11-07 17:29:01 +0000
@@ -95,8 +95,8 @@ printLCP_STATUS_REQ(FILE * output, const
                     Uint32 len, Uint16 receiverBlockNo){
   const LcpStatusReq* const sig = (LcpStatusReq*) theData;
   
-  fprintf(output, " SenderRef : %x ReqData : %u\n", 
-          sig->senderRef, sig->reqData);
+  fprintf(output, " SenderRef : %x SenderData : %u\n", 
+          sig->senderRef, sig->senderData);
   return true;
 }
 
@@ -105,10 +105,10 @@ printLCP_STATUS_CONF(FILE * output, cons
                      Uint32 len, Uint16 receiverBlockNo){
   const LcpStatusConf* const sig = (LcpStatusConf*) theData;
   
-  fprintf(output, " SenderRef : %x ReqData : %u LcpState : %u tableId : %u fragId : %u\n",
-          sig->senderRef, sig->reqData, sig->lcpState, sig->tableId, sig->fragId);
-  fprintf(output, " replica(DoneRows : %llu), lcpDone (Rows : %llu, Bytes : %llu)\n",
-          (((Uint64)sig->replicaDoneRowsHi) << 32) + sig->replicaDoneRowsLo,
+  fprintf(output, " SenderRef : %x SenderData : %u LcpState : %u tableId : %u fragId : %u\n",
+          sig->senderRef, sig->senderData, sig->lcpState, sig->tableId, sig->fragId);
+  fprintf(output, " replica(Progress : %llu), lcpDone (Rows : %llu, Bytes : %llu)\n",
+          (((Uint64)sig->completionStateHi) << 32) + sig->completionStateLo,
           (((Uint64)sig->lcpDoneRowsHi) << 32) + sig->lcpDoneRowsLo,
           (((Uint64)sig->lcpDoneBytesHi) << 32) + sig->lcpDoneBytesLo);
   return true;
@@ -119,7 +119,7 @@ printLCP_STATUS_REF(FILE * output, const
                     Uint32 len, Uint16 receiverBlockNo){
   const LcpStatusRef* const sig = (LcpStatusRef*) theData;
   
-  fprintf(output, " SenderRef : %x, ReqData : %u Error : %u\n", 
-          sig->senderRef, sig->reqData, sig->error);
+  fprintf(output, " SenderRef : %x, SenderData : %u Error : %u\n", 
+          sig->senderRef, sig->senderData, sig->error);
   return true;
 }

=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2012-09-13 20:47:06 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2012-11-09 14:37:51 +0000
@@ -4792,6 +4792,13 @@ Backup::ready_to_write(bool ready, Uint3
   ndbout << endl << "Current Millisecond is = ";
   ndbout << NdbTick_CurrentMillisecond() << endl;
 #endif
+
+  if (ERROR_INSERTED(10043) && eof)
+  {
+    /* Block indefinitely without closing the file */
+    return false;
+  }
+
   if ((ready || eof) &&
       m_words_written_this_period <= m_curr_disk_write_speed)
   {
@@ -5966,7 +5973,7 @@ Backup::execLCP_STATUS_REQ(Signal* signa
   const LcpStatusReq* req = (const LcpStatusReq*) signal->getDataPtr();
   
   const Uint32 senderRef = req->senderRef;
-  const Uint32 reqData = req->reqData;
+  const Uint32 senderData = req->senderData;
   Uint32 failCode = LcpStatusRef::NoLCPRecord;
 
   /* Find LCP backup, if there is one */
@@ -5985,18 +5992,23 @@ Backup::execLCP_STATUS_REQ(Signal* signa
       switch (ptr.p->slaveState.getState())
       {
       case STARTED:
+        jam();
         state = LcpStatusConf::LCP_PREPARED;
         break;
       case SCANNING:
+        jam();
         state = LcpStatusConf::LCP_SCANNING;
         break;
       case STOPPING:
+        jam();
         state = LcpStatusConf::LCP_SCANNED;
         break;
       case DEFINED:
+        jam();
         state = LcpStatusConf::LCP_IDLE;
         break;
       default:
+        jam();
         ndbout_c("Unusual LCP state in LCP_STATUS_REQ() : %u",
                  ptr.p->slaveState.getState());
         state = LcpStatusConf::LCP_IDLE;
@@ -6007,12 +6019,12 @@ Backup::execLCP_STATUS_REQ(Signal* signa
       
       LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr();
       conf->senderRef = reference();
-      conf->reqData = reqData;
+      conf->senderData = senderData;
       conf->lcpState = state;
       conf->tableId = UnsetConst;
       conf->fragId = UnsetConst;
-      conf->replicaDoneRowsHi = UnsetConst;
-      conf->replicaDoneRowsLo = UnsetConst;
+      conf->completionStateHi = UnsetConst;
+      conf->completionStateLo = UnsetConst;
       setWords(ptr.p->noOfRecords,
                conf->lcpDoneRowsHi,
                conf->lcpDoneRowsLo);
@@ -6020,22 +6032,15 @@ Backup::execLCP_STATUS_REQ(Signal* signa
                conf->lcpDoneBytesHi,
                conf->lcpDoneBytesLo);
       
-      if (state == LcpStatusConf::LCP_SCANNING)
+      if (state == LcpStatusConf::LCP_SCANNING ||
+          state == LcpStatusConf::LCP_SCANNED)
       {
-        /* Actually scanning a fragment, let's grab the details */
+        jam();
+        /* Actually scanning/closing a fragment, let's grab the details */
         TablePtr tabPtr;
         FragmentPtr fragPtr;
         BackupFilePtr filePtr;
         
-        ptr.p->tables.first(tabPtr);
-        if (tabPtr.i == RNIL)
-        {
-          jam();
-          failCode = LcpStatusRef::NoTableRecord;
-          break;
-        }
-        tabPtr.p->fragments.getPtr(fragPtr, 0);
-        ndbrequire(fragPtr.p->tableId == tabPtr.p->tableId);
         if (ptr.p->dataFilePtr == RNIL)
         {
           jam();
@@ -6044,11 +6049,40 @@ Backup::execLCP_STATUS_REQ(Signal* signa
         }
         c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
         ndbrequire(filePtr.p->backupPtr == ptr.i);
-        conf->tableId = tabPtr.p->tableId;
-        conf->fragId = fragPtr.p->fragmentId;
-        setWords(filePtr.p->operation.noOfRecords,
-                 conf->replicaDoneRowsHi,
-                 conf->replicaDoneRowsLo);
+
+        ptr.p->tables.first(tabPtr);
+        if (tabPtr.i != RNIL)
+        {
+          jam();
+          tabPtr.p->fragments.getPtr(fragPtr, 0);
+          ndbrequire(fragPtr.p->tableId == tabPtr.p->tableId);
+          conf->tableId = tabPtr.p->tableId;
+          conf->fragId = fragPtr.p->fragmentId;
+        }
+        
+        if (state == LcpStatusConf::LCP_SCANNING)
+        {
+          jam();
+          setWords(filePtr.p->operation.noOfRecords,
+                   conf->completionStateHi,
+                   conf->completionStateLo);
+        }
+        else if (state == LcpStatusConf::LCP_SCANNED)
+        {
+          jam();
+          /* May take some time to drain the FS buffer, depending on
+           * size of buff, achieved rate.
+           * We provide the buffer fill level so that requestors
+           * can observe whether there's progress in this phase.
+           */
+          Uint64 flushBacklog = 
+            filePtr.p->operation.dataBuffer.getUsableSize() -
+            filePtr.p->operation.dataBuffer.getFreeSize();
+          
+          setWords(flushBacklog,
+                   conf->completionStateHi,
+                   conf->completionStateLo);
+        }
       }
       
       failCode = 0;
@@ -6067,7 +6101,7 @@ Backup::execLCP_STATUS_REQ(Signal* signa
   LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr();
   
   ref->senderRef = reference();
-  ref->reqData = reqData;
+  ref->senderData = senderData;
   ref->error = failCode;
   
   sendSignal(senderRef, GSN_LCP_STATUS_REF, 

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2012-10-17 14:43:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2012-11-07 18:00:23 +0000
@@ -1095,6 +1095,8 @@ public:
     STATIC_CONST( PollingPeriodMillis = 10000 ); /* 10s */
     STATIC_CONST( WarnPeriodsWithNoProgress = 2); /* 20s */
     STATIC_CONST( MaxPeriodsWithNoProgress = 6 ); /* 60s */
+
+    SimulatedBlock* block;
     
     /* Should the watchdog be running? */
     bool scan_running;
@@ -1102,45 +1104,23 @@ public:
     /* Is there an active thread? */
     bool thread_active;
     
-    /* LCP position info from Backup block */
+    /* LCP position and state info from Backup block */
+    LcpStatusConf::LcpState lcpState;
     Uint32 tableId;
     Uint32 fragId;
-    Uint64 rowCount;
+    Uint64 completionStatus;
 
     /* Number of periods with no LCP progress observed */ 
     Uint32 pollCount;
 
     /* Reinitialise the watchdog */
-    void reset()
-    {
-      scan_running = false;
-      tableId = ~Uint32(0);
-      fragId = ~Uint32(0);
-      rowCount = ~Uint64(0);
-      pollCount = 0;
-    }
+    void reset();
 
     /* Handle an LCP Status report */
-    void handleLcpStatusRep(Uint32 repTableId,
+    void handleLcpStatusRep(LcpStatusConf::LcpState repLcpState,
+                            Uint32 repTableId,
                             Uint32 repFragId,
-                            Uint64 repRowCount)
-    {
-      if (scan_running)
-      {
-        if ((repRowCount != rowCount) ||
-            (repFragId != fragId) ||
-            (repTableId != tableId))
-        {
-          /* Something moved since last time, reset
-           * poll counter and data.
-           */
-          pollCount = 0;
-          tableId = repTableId;
-          fragId = repFragId;
-          rowCount = repRowCount;
-        }
-      }
-    }
+                            Uint64 repCompletionStatus);
   };
   
   LCPFragWatchdog c_lcpFragWatchdog;

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2012-08-13 14:03:42 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2012-11-07 18:00:23 +0000
@@ -96,6 +96,7 @@ void Dblqh::initData()
 
   c_max_parallel_scans_per_frag = 32;
 
+  c_lcpFragWatchdog.block = this;
   c_lcpFragWatchdog.reset();
   c_lcpFragWatchdog.thread_active = false;
 }//Dblqh::initData()

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-11-06 08:01:13 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-11-07 18:07:24 +0000
@@ -23649,7 +23649,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
     /* Send LCP_STATUS_REQ to BACKUP */
     LcpStatusReq* req = (LcpStatusReq*) signal->getDataPtr();
     req->senderRef = reference();
-    req->reqData = 0;
+    req->senderData = 0;
     
     BlockReference backupRef = calcInstanceBlockRef(BACKUP);
     sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal,
@@ -23996,7 +23996,7 @@ Dblqh::invokeLcpFragWatchdogThread(Signa
   
   LcpStatusReq* req = (LcpStatusReq*)signal->getDataPtr();
   req->senderRef = cownref;
-  req->reqData = 1;
+  req->senderData = 1;
   BlockReference backupRef = calcInstanceBlockRef(BACKUP);
   sendSignal(backupRef, GSN_LCP_STATUS_REQ, signal,
              LcpStatusReq::SignalLength, JBB);
@@ -24008,7 +24008,7 @@ Dblqh::execLCP_STATUS_CONF(Signal* signa
   jamEntry();
   LcpStatusConf* conf = (LcpStatusConf*) signal->getDataPtr();
   
-  if (conf->reqData == 0)
+  if (conf->senderData == 0)
   {
     /* DUMP STATE variant */
     ndbout_c("Received LCP_STATUS_CONF from %x", conf->senderRef);
@@ -24016,8 +24016,8 @@ Dblqh::execLCP_STATUS_CONF(Signal* signa
              conf->lcpState,
              conf->tableId,
              conf->fragId);
-    ndbout_c("  Replica done rows %llu",
-             (((Uint64)conf->replicaDoneRowsHi) << 32) + conf->replicaDoneRowsLo);
+    ndbout_c("  Completion State %llu",
+             (((Uint64)conf->completionStateHi) << 32) + conf->completionStateLo);
     ndbout_c("  Lcp done rows %llu, done bytes %llu",
              (((Uint64)conf->lcpDoneRowsHi) << 32) + conf->lcpDoneRowsLo,
              (((Uint64)conf->lcpDoneBytesHi) << 32) + conf->lcpDoneBytesLo);
@@ -24026,10 +24026,11 @@ Dblqh::execLCP_STATUS_CONF(Signal* signa
   /* We can ignore the LCP status as if it's complete then we should
    * promptly stop watching
    */
-  c_lcpFragWatchdog.handleLcpStatusRep(conf->tableId,
+  c_lcpFragWatchdog.handleLcpStatusRep((LcpStatusConf::LcpState)conf->lcpState,
+                                       conf->tableId,
                                        conf->fragId,
-                                       (((Uint64)conf->replicaDoneRowsHi) << 32) + 
-                                       conf->replicaDoneRowsLo);
+                                       (((Uint64)conf->completionStateHi) << 32) + 
+                                       conf->completionStateLo);
 }
 
 void
@@ -24038,12 +24039,53 @@ Dblqh::execLCP_STATUS_REF(Signal* signal
   jamEntry();
   LcpStatusRef* ref = (LcpStatusRef*) signal->getDataPtr();
 
-  ndbout_c("Received LCP_STATUS_REF from %x, reqData = %u with error code %u",
-           ref->senderRef, ref->reqData, ref->error);
+  ndbout_c("Received LCP_STATUS_REF from %x, senderData = %u with error code %u",
+           ref->senderRef, ref->senderData, ref->error);
 
   ndbrequire(false);
 }
 
+void
+Dblqh::LCPFragWatchdog::reset()
+{
+  jamBlock(block);
+  scan_running = false;
+  lcpState = LcpStatusConf::LCP_IDLE;
+  tableId = ~Uint32(0);
+  fragId = ~Uint32(0);
+  completionStatus = ~Uint64(0);
+  pollCount = 0;
+}
+
+void
+Dblqh::LCPFragWatchdog::handleLcpStatusRep(LcpStatusConf::LcpState repLcpState,
+                                           Uint32 repTableId,
+                                           Uint32 repFragId,
+                                           Uint64 repCompletionStatus)
+{
+  jamBlock(block);
+  if (scan_running)
+  {
+    jamBlock(block);
+    if ((repCompletionStatus != completionStatus) ||
+        (repFragId != fragId) ||
+        (repTableId != tableId) ||
+        (repLcpState != lcpState))
+    {
+      jamBlock(block);
+      /* Something moved since last time, reset
+       * poll counter and data.
+       */
+      pollCount = 0;
+      lcpState = repLcpState;
+      tableId = repTableId;
+      fragId = repFragId;
+      completionStatus = repCompletionStatus;
+    }
+  }
+}
+
+
 /**
  * checkLcpFragWatchdog
  *
@@ -24071,20 +24113,27 @@ Dblqh::checkLcpFragWatchdog(Signal* sign
       LCPFragWatchdog::WarnPeriodsWithNoProgress)
   {
     jam();
+    const char* completionStatusString = 
+      (c_lcpFragWatchdog.lcpState == LcpStatusConf::LCP_SCANNING?
+       "rows completed":
+       "bytes remaining.");
+    
     warningEvent("LCP Frag watchdog : No progress on table %u, frag %u for %u s."
-                 "  %llu rows completed",
+                 "  %llu %s",
                  c_lcpFragWatchdog.tableId,
                  c_lcpFragWatchdog.fragId,
                  (LCPFragWatchdog::PollingPeriodMillis * 
                   c_lcpFragWatchdog.pollCount) / 1000,
-                 c_lcpFragWatchdog.rowCount);
+                 c_lcpFragWatchdog.completionStatus,
+                 completionStatusString);
     ndbout_c("LCP Frag watchdog : No progress on table %u, frag %u for %u s."
-             "  %llu rows completed",
+             "  %llu %s",
              c_lcpFragWatchdog.tableId,
              c_lcpFragWatchdog.fragId,
              (LCPFragWatchdog::PollingPeriodMillis * 
               c_lcpFragWatchdog.pollCount) / 1000,
-             c_lcpFragWatchdog.rowCount);
+             c_lcpFragWatchdog.completionStatus,
+             completionStatusString);
     
     if (c_lcpFragWatchdog.pollCount >= 
         LCPFragWatchdog::MaxPeriodsWithNoProgress)

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2012-09-19 07:09:57 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2012-11-08 12:02:48 +0000
@@ -152,6 +152,12 @@ public:
   };
   typedef Ptr<TableRecord> TableRecordPtr;
 
+  enum Buffer_type {
+    BUFFER_VOID  = 0,
+    BUFFER_STACK = 1,
+    BUFFER_VAR   = 2
+  };
+
   struct RowRef
   {
     Uint32 m_page_id;
@@ -159,7 +165,7 @@ public:
     union
     {
       Uint16 unused;
-      Uint16 m_allocator;
+      enum Buffer_type m_alloc_type:16;
     };
 
     void copyto_link(Uint32 * dst) const {
@@ -198,7 +204,6 @@ public:
   struct RowPtr
   {
     Uint32 m_type;
-    Uint32 m_src_node_no;
     Uint32 m_src_node_ptrI;
     Uint32 m_src_correlation;
 
@@ -234,8 +239,26 @@ public:
     };
   };
 
-  struct SLFifoRowList
+  struct RowBuffer;  // forward decl.
+
+  /**
+   * Define overlayed 'base class' for SLFifoRowList and RowMap.
+   * As we want these to be POD struct, we does not use 
+   * inheritance, but have to take care that first part
+   * of these struct are correctly overlayed.
+   */
+  struct RowCollectionBase
   {
+    RowBuffer* m_rowBuffer;
+  };
+
+  struct SLFifoRowList //: public RowCollectionBase
+  {
+    /**
+     * BEWARE: Overlayed 'struct RowCollectionBase'
+     */
+    RowBuffer* m_rowBuffer;
+
     /**
      * Data used for a single linked list of rows
      */
@@ -244,13 +267,22 @@ public:
     Uint16 m_first_row_page_pos;
     Uint16 m_last_row_page_pos;
 
+    void construct(RowBuffer& rowBuffer) {
+      m_rowBuffer = &rowBuffer;
+      init();
+    }
     void init() { m_first_row_page_id = RNIL;}
     bool isNull() const { return m_first_row_page_id == RNIL; }
   };
 
-  struct RowMap
+  struct RowMap //: public RowCollectionBase
   {
     /**
+     * BEWARE: Overlayed 'struct RowCollectionBase'
+     */
+    RowBuffer* m_rowBuffer;
+
+    /**
      * Data used for a map with rows (key is correlation id)
      *   currently a single array is used to store row references
      *   (size == batch size)
@@ -259,7 +291,18 @@ public:
     Uint16 m_size;                // size of array
     Uint16 m_elements;            // #elements in array
 
-    void init() { m_map_ref.setNull();}
+    void construct(RowBuffer& rowBuffer,
+                   Uint32 capacity)
+    {
+      m_rowBuffer = &rowBuffer;
+      m_size = capacity;
+      init();
+    }
+    void init() {
+      m_map_ref.setNull();
+      m_elements = 0;
+    }
+
     bool isNull() const { return m_map_ref.isNull(); }
 
     void assign (RowRef ref) {
@@ -296,7 +339,14 @@ public:
     STATIC_CONST( MAP_SIZE_PER_REF_16 = 3 );
   };
 
-  struct SLFifoRowListIterator
+  /**
+   * Define overlayed 'base class' for SLFifoRowListIterator
+   * and RowMapIterator.
+   * As we want these to be POD struct, we does not use 
+   * inheritance, but have to take care that first part
+   * of these struct are correctly overlayed.
+   */
+  struct RowIteratorBase
   {
     RowRef m_ref;
     Uint32 * m_row_ptr;
@@ -305,27 +355,96 @@ public:
     void setNull() { m_ref.setNull(); }
   };
 
-  struct SLFifoRowListIteratorPtr
+  struct SLFifoRowListIterator //: public RowIteratorBase
   {
+    /**
+     * BEWARE: Overlayed 'struct RowIteratorBase'
+     */
     RowRef m_ref;
+    Uint32 * m_row_ptr;
+
+    bool isNull() const { return m_ref.isNull(); }
+    void setNull() { m_ref.setNull(); }
+    // END: RowIteratorBase
   };
 
-  struct RowMapIterator
+  struct RowMapIterator //: public RowIteratorBase
   {
+    /**
+     * BEWARE: Overlayed 'struct RowIteratorBase'
+     */
+    RowRef m_ref;
     Uint32 * m_row_ptr;
+
+    bool isNull() const { return m_ref.isNull(); }
+    void setNull() { m_ref.setNull(); }
+    // END: RowIteratorBase
+
     Uint32 * m_map_ptr;
-    RowRef m_ref; // position of actual row
     Uint16 m_size;
     Uint16 m_element_no;
-    bool isNull() const { return m_ref.isNull(); }
-    void setNull() { m_ref.setNull(); }
   };
 
-  struct RowMapIteratorPtr
+  /**
+   * Abstraction of SLFifoRowList & RowMap
+   */
+  struct RowCollection
   {
-    Uint32 m_element_no;
+    enum collection_type
+    {
+      COLLECTION_VOID,
+      COLLECTION_MAP,
+      COLLECTION_LIST
+    };
+    union
+    {
+      RowCollectionBase m_base;  // Common part for map & list
+      SLFifoRowList m_list;
+      RowMap m_map;
+    };
+
+    RowCollection() : m_type(COLLECTION_VOID) {}
+
+    void construct(collection_type type,
+                   RowBuffer& rowBuffer,
+                   Uint32 capacity)
+    {
+      m_type = type;
+      if (m_type == COLLECTION_MAP)
+        m_map.construct(rowBuffer,capacity);
+      else if (m_type == COLLECTION_LIST)
+        m_list.construct(rowBuffer);
+    }
+
+    void init() {
+      if (m_type == COLLECTION_MAP)
+        m_map.init();
+      else if (m_type == COLLECTION_LIST)
+        m_list.init();
+    }
+
+    Uint32 rowOffset() const {
+      return (m_type == COLLECTION_MAP) ? 0 : 2;
+    }
+
+    collection_type m_type;
   };
 
+  struct RowIterator
+  {
+    union
+    {
+      RowIteratorBase m_base;  // Common part for map & list
+      SLFifoRowListIterator m_list;
+      RowMapIterator m_map;
+    };
+    RowCollection::collection_type m_type;
+
+    RowIterator() { init(); }
+    void init() { m_base.setNull(); }
+    bool isNull() const { return m_base.isNull(); }
+  };
+ 
 
   /**
    * A struct used when building an TreeNode
@@ -368,11 +487,24 @@ public:
 
   struct RowBuffer
   {
-    RowBuffer() { stack_init(); }
+    enum Buffer_type m_type;
+
+    RowBuffer() : m_type(BUFFER_VOID) {}
     DLFifoList<RowPage>::Head m_page_list;
 
-    void stack_init() { new (&m_page_list) DLFifoList<RowPage>::Head(); m_stack.m_pos = 0xFFFF; }
-    void var_init() { new (&m_page_list) DLFifoList<RowPage>::Head(); m_var.m_free = 0; }
+    void init(enum Buffer_type type)
+    {
+      new (&m_page_list) DLFifoList<RowPage>::Head();
+      m_type = type;
+      reset();
+    }
+    void reset()
+    {
+      if (m_type == BUFFER_STACK)
+        m_stack.m_pos = 0xFFFF;
+      else if (m_type == BUFFER_VAR)
+        m_var.m_free = 0;
+    }
 
     struct Stack
     {
@@ -840,11 +972,7 @@ public:
     /**
      * Rows buffered by this node
      */
-    union
-    {
-      RowMap m_row_map;
-      SLFifoRowList m_row_list;
-    };
+    RowCollection m_rows;
 
     union
     {
@@ -892,7 +1020,7 @@ public:
       RT_SCAN                = 0x1  // unbounded result set, scan interface
       ,RT_ROW_BUFFERS        = 0x2  // Do any of the node use row-buffering
       ,RT_MULTI_SCAN         = 0x4  // Is there several scans in request
-      ,RT_VAR_ALLOC          = 0x8  // Is var-allocation used for row-buffer
+//    ,RT_VAR_ALLOC          = 0x8  // DEPRECATED
       ,RT_NEED_PREPARE       = 0x10 // Does any node need m_prepare hook
       ,RT_NEED_COMPLETE      = 0x20 // Does any node need m_complete hook
       ,RT_REPEAT_SCAN_RESULT = 0x40 // Repeat bushy scan result when required
@@ -1122,6 +1250,7 @@ private:
    */
   const OpInfo* getOpInfo(Uint32 op);
   Uint32 build(Build_context&,Ptr<Request>,SectionReader&,SectionReader&);
+  Uint32 initRowBuffers(Ptr<Request>);
   void checkPrepareComplete(Signal*, Ptr<Request>, Uint32 cnt);
   void start(Signal*, Ptr<Request>);
   void checkBatchComplete(Signal*, Ptr<Request>, Uint32 cnt);
@@ -1138,7 +1267,6 @@ private:
   void releaseScanBuffers(Ptr<Request> requestPtr);
   void releaseRequestBuffers(Ptr<Request> requestPtr, bool reset);
   void releaseNodeRows(Ptr<Request> requestPtr, Ptr<TreeNode>);
-  void releaseRow(Ptr<Request>, RowRef ref);
   void registerActiveCursor(Ptr<Request>, Ptr<TreeNode>);
   void nodeFail_checkRequests(Signal*);
   void cleanup_common(Ptr<Request>, Ptr<TreeNode>);
@@ -1146,31 +1274,37 @@ private:
   /**
    * Row buffering
    */
-  Uint32 storeRow(Ptr<Request>, Ptr<TreeNode>, RowPtr &row);
+  Uint32 storeRow(RowCollection& collection, RowPtr &row);
+  void releaseRow(RowCollection& collection, RowRef ref);
   Uint32* stackAlloc(RowBuffer& dst, RowRef&, Uint32 len);
   Uint32* varAlloc(RowBuffer& dst, RowRef&, Uint32 len);
+  Uint32* rowAlloc(RowBuffer& dst, RowRef&, Uint32 len);
 
-  void add_to_list(SLFifoRowList & list, RowRef rowref);
-  Uint32 add_to_map(Ptr<Request> requestPtr, Ptr<TreeNode>, Uint32, RowRef);
-  Uint32 * get_row_ptr(const RowMap&, RowMapIterator pos);
-  void setupRowPtr(Ptr<TreeNode>, RowPtr& dst, RowRef, const Uint32 * src);
-
-  // NOTE: ref contains info about it being stack/var
-  // so adding an inline would be nice...but that remove possibility
-  // to add jam()'s
-  Uint32 * get_row_ptr_stack(RowRef pos);
-  Uint32 * get_row_ptr_var(RowRef pos);
+  void add_to_list(SLFifoRowList & list, RowRef);
+  Uint32 add_to_map(RowMap& map, Uint32, RowRef);
+
+  void setupRowPtr(const RowCollection& collection,
+                   RowPtr& dst, RowRef, const Uint32 * src);
+  Uint32 * get_row_ptr(RowRef pos);
 
   /**
    * SLFifoRowListIterator
    */
-  bool first(Ptr<Request>, Ptr<TreeNode>, SLFifoRowListIterator&);
+  bool first(const SLFifoRowList& list, SLFifoRowListIterator&);
   bool next(SLFifoRowListIterator&);
-  bool next(Ptr<Request>, Ptr<TreeNode>, SLFifoRowListIterator&, SLFifoRowListIteratorPtr);
 
-  bool first(Ptr<Request>, Ptr<TreeNode>, RowMapIterator&);
+  /**
+   * RowMapIterator
+   */
+  bool first(const RowMap& map, RowMapIterator&);
   bool next(RowMapIterator&);
-  bool next(Ptr<Request>,Ptr<TreeNode>, RowMapIterator&, RowMapIteratorPtr);
+
+  /**
+   * RowIterator:
+   * Abstraction which may iterate either a RowList or Map
+   */
+  bool first(const RowCollection&, RowIterator&);
+  bool next(RowIterator&);
 
   /**
    * Misc

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2012-10-05 07:45:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2012-11-08 12:02:48 +0000
@@ -44,34 +44,31 @@
 #include <signaldata/ReadNodesConf.hpp>
 #include <signaldata/SignalDroppedRep.hpp>
 
-// Use DEBUG to print messages that should be
-// seen only when we debug the product
-
 #ifdef VM_TRACE
 
+/**
+ * DEBUG options for different parts od SPJ block
+ * Comment out those part you don't want DEBUG'ed.
+ */
 #define DEBUG(x) ndbout << "DBSPJ: "<< x << endl;
-#define DEBUG_DICT(x) ndbout << "DBSPJ: "<< x << endl;
-#define DEBUG_LQHKEYREQ
-#define DEBUG_SCAN_FRAGREQ
-
-#else
+//#define DEBUG_DICT(x) ndbout << "DBSPJ: "<< x << endl;
+//#define DEBUG_LQHKEYREQ
+//#define DEBUG_SCAN_FRAGREQ
+#endif
 
+/**
+ * Provide empty defs for those DEBUGs which has to be defined.
+ */
+#if !defined(DEBUG)
 #define DEBUG(x)
-#define DEBUG_DICT(x)
-
 #endif
 
-#define DEBUG_CRASH() ndbassert(false)
-
-#if 1
-#undef DEBUG
-#define DEBUG(x)
-#undef DEBUG_DICT
+#if !defined(DEBUG_DICT)
 #define DEBUG_DICT(x)
-#undef DEBUG_LQHKEYREQ
-#undef DEBUG_SCAN_FRAGREQ
 #endif
 
+#define DEBUG_CRASH() ndbassert(false)
+
 const Ptr<Dbspj::TreeNode> Dbspj::NullTreeNodePtr = { 0, RNIL };
 const Dbspj::RowRef Dbspj::NullRowRef = { RNIL, GLOBAL_PAGE_SIZE_WORDS, { 0 } };
 
@@ -1289,54 +1286,92 @@ Dbspj::build(Build_context& ctx,
   }
   requestPtr.p->m_node_cnt = ctx.m_cnt;
 
+  if (ctx.m_scan_cnt > 1)
+  {
+    jam();
+    requestPtr.p->m_bits |= Request::RT_MULTI_SCAN;
+  }
+
+  // Construct RowBuffers where required
+  err = initRowBuffers(requestPtr);
+  if (unlikely(err != 0))
+  {
+    jam();
+    goto error;
+  }
+
+  return 0;
+
+error:
+  jam();
+  return err;
+}
+
+/**
+ * initRowBuffers will decide row-buffering strategy, and init
+ * the RowBuffers where required.
+ */
+Uint32
+Dbspj::initRowBuffers(Ptr<Request> requestPtr)
+{
+  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
+
   /**
-   * Init ROW_BUFFERS for those TreeNodes requiring either
-   * T_ROW_BUFFER or T_ROW_BUFFER_MAP.
+   * Init ROW_BUFFERS iff Request has to buffer any rows.
    */
   if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS)
   {
+    jam();
+
+    /**
+     * Iff, multi-scan is non-bushy (normal case)
+     *   we don't strictly need BUFFER_VAR for RT_ROW_BUFFERS
+     *   but could instead pop-row stack frame,
+     *     however this is not implemented...
+     *
+     * so, currently use BUFFER_VAR if 'RT_MULTI_SCAN'
+     *
+     * NOTE: This should easily be solvable by having a 
+     *       RowBuffer for each TreeNode instead
+     */
+    if (requestPtr.p->m_bits & Request::RT_MULTI_SCAN)
+    {
+      jam();
+      requestPtr.p->m_rowBuffer.init(BUFFER_VAR);
+    }
+    else
+    {
+      jam();
+      requestPtr.p->m_rowBuffer.init(BUFFER_STACK);
+    }
+
     Ptr<TreeNode> treeNodePtr;
-    Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
     for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
     {
+      jam();
+      ndbassert(treeNodePtr.p->m_batch_size > 0);
+      /**
+       * Construct a List or Map RowCollection for those TreeNodes
+       * requiring rows to be buffered.
+       */
       if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)
       {
         jam();
-        treeNodePtr.p->m_row_map.init();
+        treeNodePtr.p->m_rows.construct (RowCollection::COLLECTION_MAP,
+                                         requestPtr.p->m_rowBuffer,
+                                         treeNodePtr.p->m_batch_size);
       }
       else if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
       {
         jam();
-        treeNodePtr.p->m_row_list.init();
+        treeNodePtr.p->m_rows.construct (RowCollection::COLLECTION_LIST,
+                                         requestPtr.p->m_rowBuffer,
+                                         treeNodePtr.p->m_batch_size);
       }
     }
   }
 
-  if (ctx.m_scan_cnt > 1)
-  {
-    jam();
-    requestPtr.p->m_bits |= Request::RT_MULTI_SCAN;
-
-    /**
-     * Iff, multi-scan is non-bushy (normal case)
-     *   we don't strictly need RT_VAR_ALLOC for RT_ROW_BUFFERS
-     *   but could instead pop-row stack frame,
-     *     however this is not implemented...
-     *
-     * so, use RT_VAR_ALLOC
-     */
-    if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS)
-    {
-      jam();
-      requestPtr.p->m_bits |= Request::RT_VAR_ALLOC;
-    }
-  }
-
   return 0;
-
-error:
-  jam();
-  return err;
 }
 
 Uint32
@@ -1859,64 +1894,46 @@ Dbspj::releaseNodeRows(Ptr<Request> requ
      << ", request: " << requestPtr.i
   );
 
-  // only when var-alloc, or else stack will be popped wo/ consideration
-  // to individual rows
-  ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC);
   ndbassert(treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER);
 
-  /**
-   * Two ways to iterate...
-   */
-  if ((treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) == 0)
+  Uint32 cnt = 0;
+  RowIterator iter;
+  for (first(treeNodePtr.p->m_rows, iter); !iter.isNull(); )
   {
     jam();
-    Uint32 cnt = 0;
-    SLFifoRowListIterator iter;
-    for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); )
-    {
-      jam();
-      RowRef pos = iter.m_ref;
-      next(iter);
-      releaseRow(requestPtr, pos);
-      cnt ++;
-    }
-    treeNodePtr.p->m_row_list.init();
-    DEBUG("SLFifoRowListIterator: released " << cnt << " rows!");
+    RowRef pos = iter.m_base.m_ref;
+    next(iter);
+    releaseRow(treeNodePtr.p->m_rows, pos);
+    cnt ++;
   }
-  else
+  treeNodePtr.p->m_rows.init();
+  DEBUG("RowIterator: released " << cnt << " rows!");
+
+  if (treeNodePtr.p->m_rows.m_type == RowCollection::COLLECTION_MAP)
   {
     jam();
-    Uint32 cnt = 0;
-    RowMapIterator iter;
-    for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); )
-    {
-      jam();
-      RowRef pos = iter.m_ref;
-      // this could be made more efficient by not actually seting up m_row_ptr
-      next(iter);
-      releaseRow(requestPtr, pos);
-      cnt++;
-    }
-
     // Release the (now empty) RowMap
-    RowMap& map = treeNodePtr.p->m_row_map;
+    RowMap& map = treeNodePtr.p->m_rows.m_map;
     if (!map.isNull())
     {
       jam();
       RowRef ref;
       map.copyto(ref);
-      releaseRow(requestPtr, ref);  // Map was allocated in row memory
-      map.init();
+      releaseRow(treeNodePtr.p->m_rows, ref);  // Map was allocated in row memory
     }
-    DEBUG("RowMapIterator: released " << cnt << " rows!");
   }
 }
 
 void
-Dbspj::releaseRow(Ptr<Request> requestPtr, RowRef pos)
+Dbspj::releaseRow(RowCollection& collection, RowRef pos)
 {
-  ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC);
-  ndbassert(pos.m_allocator == 1);
+  // only when var-alloc, or else stack will be popped wo/ consideration
+  // to individual rows
+  ndbassert(collection.m_base.m_rowBuffer != NULL);
+  ndbassert(collection.m_base.m_rowBuffer->m_type == BUFFER_VAR);
+  ndbassert(pos.m_alloc_type == BUFFER_VAR);
+
+  RowBuffer& rowBuffer = *collection.m_base.m_rowBuffer;
   Ptr<RowPage> ptr;
   m_page_pool.getPtr(ptr, pos.m_page_id);
   ((Var_page*)ptr.p)->free_record(pos.m_page_pos, Var_page::CHAIN);
@@ -1925,7 +1942,7 @@ Dbspj::releaseRow(Ptr<Request> requestPt
   {
     jam();
     LocalDLFifoList<RowPage> list(m_page_pool,
-                                  requestPtr.p->m_rowBuffer.m_page_list);
+                                  rowBuffer.m_page_list);
     const bool last = list.hasNext(ptr) == false;
     list.remove(ptr);
     if (list.isEmpty())
@@ -1935,7 +1952,7 @@ Dbspj::releaseRow(Ptr<Request> requestPt
        * Don't remove last page...
        */
       list.addLast(ptr);
-      requestPtr.p->m_rowBuffer.m_var.m_free = free_space;
+      rowBuffer.m_var.m_free = free_space;
     }
     else
     {
@@ -1948,20 +1965,19 @@ Dbspj::releaseRow(Ptr<Request> requestPt
          */
         Ptr<RowPage> newLastPtr;
         ndbrequire(list.last(newLastPtr));
-        requestPtr.p->m_rowBuffer.m_var.m_free =
-          ((Var_page*)newLastPtr.p)->free_space;
+        rowBuffer.m_var.m_free = ((Var_page*)newLastPtr.p)->free_space;
       }
       releasePage(ptr);
     }
   }
-  else if (free_space > requestPtr.p->m_rowBuffer.m_var.m_free)
+  else if (free_space > rowBuffer.m_var.m_free)
   {
     jam();
     LocalDLFifoList<RowPage> list(m_page_pool,
-                                  requestPtr.p->m_rowBuffer.m_page_list);
+                                  rowBuffer.m_page_list);
     list.remove(ptr);
     list.addLast(ptr);
-    requestPtr.p->m_rowBuffer.m_var.m_free = free_space;
+    rowBuffer.m_var.m_free = free_space;
   }
 }
 
@@ -1988,7 +2004,7 @@ Dbspj::releaseRequestBuffers(Ptr<Request
         list.remove();
       }
     }
-    requestPtr.p->m_rowBuffer.stack_init();
+    requestPtr.p->m_rowBuffer.reset();
   }
 
   if (reset)
@@ -1998,19 +2014,7 @@ Dbspj::releaseRequestBuffers(Ptr<Request
     for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
     {
       jam();
-      if (nodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
-      {
-        jam();
-        if (nodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)
-        {
-          jam();
-          nodePtr.p->m_row_map.init();
-        }
-        else
-        {
-          nodePtr.p->m_row_list.init();
-        }
-      }
+      nodePtr.p->m_rows.init();
     }
   }
 }
@@ -2569,6 +2573,11 @@ Dbspj::execTRANSID_AI(Signal* signal)
   {
     jam();
     Uint32 err;
+
+    DEBUG("Need to storeRow"
+      << ", node: " << treeNodePtr.p->m_node_no
+    );
+
     if (ERROR_INSERTED(17120) ||
        (ERROR_INSERTED(17121) && treeNodePtr.p->m_parentPtrI != RNIL))
     {
@@ -2576,7 +2585,7 @@ Dbspj::execTRANSID_AI(Signal* signal)
       CLEAR_ERROR_INSERT_VALUE;
       abort(signal, requestPtr, DbspjErr::OutOfRowMemory);
     }
-    else if ((err = storeRow(requestPtr, treeNodePtr, row)) != 0)
+    else if ((err = storeRow(treeNodePtr.p->m_rows, row)) != 0)
     {
       jam();
       abort(signal, requestPtr, err);
@@ -2593,60 +2602,43 @@ Dbspj::execTRANSID_AI(Signal* signal)
 }
 
 Uint32
-Dbspj::storeRow(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, RowPtr &row)
+Dbspj::storeRow(RowCollection& collection, RowPtr &row)
 {
   ndbassert(row.m_type == RowPtr::RT_SECTION);
   SegmentedSectionPtr dataPtr = row.m_row_data.m_section.m_dataPtr;
   Uint32 * headptr = (Uint32*)row.m_row_data.m_section.m_header;
   Uint32 headlen = 1 + row.m_row_data.m_section.m_header->m_len;
 
-  DEBUG("storeRow"
-     << ", node: " << treeNodePtr.p->m_node_no
-     << ", request: " << requestPtr.i
-  );
-
   /**
-   * If rows are not in map, then they are kept in linked list
+   * Rows might be stored at an offset within the collection.
    */
-  Uint32 linklen = (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)?
-    0 : 2;
+  const Uint32 offset = collection.rowOffset();
 
   Uint32 totlen = 0;
   totlen += dataPtr.sz;
   totlen += headlen;
-  totlen += linklen;
+  totlen += offset;
 
   RowRef ref;
-  Uint32 * dstptr = 0;
-  if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0)
-  {
-    jam();
-    dstptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, totlen);
-  }
-  else
-  {
-    jam();
-    dstptr = varAlloc(requestPtr.p->m_rowBuffer, ref, totlen);
-  }
-
+  Uint32* const dstptr = rowAlloc(*collection.m_base.m_rowBuffer, ref, totlen);
   if (unlikely(dstptr == 0))
   {
     jam();
     return DbspjErr::OutOfRowMemory;
   }
-  memcpy(dstptr + linklen, headptr, 4 * headlen);
-  copy(dstptr + linklen + headlen, dataPtr);
+  memcpy(dstptr + offset, headptr, 4 * headlen);
+  copy(dstptr + offset + headlen, dataPtr);
 
-  if (linklen)
+  if (collection.m_type == RowCollection::COLLECTION_LIST)
   {
     jam();
     NullRowRef.copyto_link(dstptr); // Null terminate list...
-    add_to_list(treeNodePtr.p->m_row_list, ref);
+    add_to_list(collection.m_list, ref);
   }
   else
   {
     jam();
-    Uint32 error = add_to_map(requestPtr, treeNodePtr, row.m_src_correlation, ref);
+    Uint32 error = add_to_map(collection.m_map, row.m_src_correlation, ref);
     if (unlikely(error))
       return error;
   }
@@ -2656,30 +2648,17 @@ Dbspj::storeRow(Ptr<Request> requestPtr,
    * as above add_to_xxx may mave reorganized memory causing
    * alloced row to be moved.
    */
-  Uint32 * rowptr = 0;
-  if (ref.m_allocator == 0)
-  {
-    jam();
-    rowptr = get_row_ptr_stack(ref);
-  }
-  else
-  {
-    jam();
-    rowptr = get_row_ptr_var(ref);
-  }
-
-//ndbrequire(rowptr==dstptr);  // It moved which we now do handle
-  setupRowPtr(treeNodePtr, row, ref, rowptr);
+  const Uint32* const rowptr = get_row_ptr(ref);
+  setupRowPtr(collection, row, ref, rowptr);
   return 0;
 }
 
 void
-Dbspj::setupRowPtr(Ptr<TreeNode> treeNodePtr,
+Dbspj::setupRowPtr(const RowCollection& collection,
                    RowPtr& row, RowRef ref, const Uint32 * src)
 {
-  Uint32 linklen = (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)?
-    0 : 2;
-  const RowPtr::Header * headptr = (RowPtr::Header*)(src + linklen);
+  const Uint32 offset = collection.rowOffset();
+  const RowPtr::Header * headptr = (RowPtr::Header*)(src + offset);
   Uint32 headlen = 1 + headptr->m_len;
 
   row.m_type = RowPtr::RT_LINEAR;
@@ -2704,20 +2683,10 @@ Dbspj::add_to_list(SLFifoRowList & list,
      * add last to list
      */
     RowRef last;
-    last.m_allocator = rowref.m_allocator;
+    last.m_alloc_type = rowref.m_alloc_type;
     last.m_page_id = list.m_last_row_page_id;
     last.m_page_pos = list.m_last_row_page_pos;
-    Uint32 * rowptr;
-    if (rowref.m_allocator == 0)
-    {
-      jam();
-      rowptr = get_row_ptr_stack(last);
-    }
-    else
-    {
-      jam();
-      rowptr = get_row_ptr_var(last);
-    }
+    Uint32 * const rowptr = get_row_ptr(last);
     rowref.copyto_link(rowptr);
   }
 
@@ -2726,29 +2695,28 @@ Dbspj::add_to_list(SLFifoRowList & list,
 }
 
 Uint32 *
-Dbspj::get_row_ptr_stack(RowRef pos)
+Dbspj::get_row_ptr(RowRef pos)
 {
-  ndbassert(pos.m_allocator == 0);
   Ptr<RowPage> ptr;
   m_page_pool.getPtr(ptr, pos.m_page_id);
-  return ptr.p->m_data + pos.m_page_pos;
-}
-
-Uint32 *
-Dbspj::get_row_ptr_var(RowRef pos)
-{
-  ndbassert(pos.m_allocator == 1);
-  Ptr<RowPage> ptr;
-  m_page_pool.getPtr(ptr, pos.m_page_id);
-  return ((Var_page*)ptr.p)->get_ptr(pos.m_page_pos);
+  if (pos.m_alloc_type == BUFFER_STACK) // ::stackAlloc() memory
+  {
+    jam();
+    return ptr.p->m_data + pos.m_page_pos;
+  }
+  else                                 // ::varAlloc() memory
+  {
+    jam();
+    ndbassert(pos.m_alloc_type == BUFFER_VAR);
+    return ((Var_page*)ptr.p)->get_ptr(pos.m_page_pos);
+  }
 }
 
+inline
 bool
-Dbspj::first(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
+Dbspj::first(const SLFifoRowList& list,
              SLFifoRowListIterator& iter)
 {
-  Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
-  SLFifoRowList & list = treeNodePtr.p->m_row_list;
   if (list.isNull())
   {
     jam();
@@ -2756,23 +2724,15 @@ Dbspj::first(Ptr<Request> requestPtr, Pt
     return false;
   }
 
-  iter.m_ref.m_allocator = var;
+  //  const Buffer_type allocator = list.m_rowBuffer->m_type;
+  iter.m_ref.m_alloc_type = list.m_rowBuffer->m_type;
   iter.m_ref.m_page_id = list.m_first_row_page_id;
   iter.m_ref.m_page_pos = list.m_first_row_page_pos;
-  if (var == 0)
-  {
-    jam();
-    iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
-  }
-  else
-  {
-    jam();
-    iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
-  }
-
+  iter.m_row_ptr = get_row_ptr(iter.m_ref);
   return true;
 }
 
+inline
 bool
 Dbspj::next(SLFifoRowListIterator& iter)
 {
@@ -2782,63 +2742,25 @@ Dbspj::next(SLFifoRowListIterator& iter)
     jam();
     return false;
   }
-
-  if (iter.m_ref.m_allocator == 0)
-  {
-    jam();
-    iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
-  }
-  else
-  {
-    jam();
-    iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
-  }
+  iter.m_row_ptr = get_row_ptr(iter.m_ref);
   return true;
 }
 
-bool
-Dbspj::next(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
-            SLFifoRowListIterator& iter, SLFifoRowListIteratorPtr start)
-{
-  Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
-  (void)var;
-  ndbassert(var == iter.m_ref.m_allocator);
-  if (iter.m_ref.m_allocator == 0)
-  {
-    jam();
-    iter.m_row_ptr = get_row_ptr_stack(start.m_ref);
-  }
-  else
-  {
-    jam();
-    iter.m_row_ptr = get_row_ptr_var(start.m_ref);
-  }
-  return next(iter);
-}
-
 Uint32
-Dbspj::add_to_map(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
+Dbspj::add_to_map(RowMap& map,
                   Uint32 corrVal, RowRef rowref)
 {
   Uint32 * mapptr;
-  RowMap& map = treeNodePtr.p->m_row_map;
   if (map.isNull())
   {
     jam();
-    Uint16 batchsize = treeNodePtr.p->m_batch_size;
-    Uint32 sz16 = RowMap::MAP_SIZE_PER_REF_16 * batchsize;
+    ndbassert(map.m_size > 0);
+    ndbassert(map.m_rowBuffer != NULL);
+
+    Uint32 sz16 = RowMap::MAP_SIZE_PER_REF_16 * map.m_size;
     Uint32 sz32 = (sz16 + 1) / 2;
     RowRef ref;
-    if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0)
-    {
-      jam();
-      mapptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, sz32);
-    }
-    else
-    {
-      jam();
-      mapptr = varAlloc(requestPtr.p->m_rowBuffer, ref, sz32);
-    }
+    mapptr = rowAlloc(*map.m_rowBuffer, ref, sz32);
     if (unlikely(mapptr == 0))
     {
       jam();
@@ -2846,7 +2768,6 @@ Dbspj::add_to_map(Ptr<Request> requestPt
     }
     map.assign(ref);
     map.m_elements = 0;
-    map.m_size = batchsize;
     map.clear(mapptr);
   }
   else
@@ -2854,16 +2775,7 @@ Dbspj::add_to_map(Ptr<Request> requestPt
     jam();
     RowRef ref;
     map.copyto(ref);
-    if (ref.m_allocator == 0)
-    {
-      jam();
-      mapptr = get_row_ptr_stack(ref);
-    }
-    else
-    {
-      jam();
-      mapptr = get_row_ptr_var(ref);
-    }
+    mapptr = get_row_ptr(ref);
   }
 
   Uint32 pos = corrVal & 0xFFFF;
@@ -2885,12 +2797,11 @@ Dbspj::add_to_map(Ptr<Request> requestPt
   return 0;
 }
 
+inline
 bool
-Dbspj::first(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
+Dbspj::first(const RowMap& map,
              RowMapIterator & iter)
 {
-  Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
-  RowMap& map = treeNodePtr.p->m_row_map;
   if (map.isNull())
   {
     jam();
@@ -2898,18 +2809,9 @@ Dbspj::first(Ptr<Request> requestPtr, Pt
     return false;
   }
 
-  if (var == 0)
-  {
-    jam();
-    iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref);
-  }
-  else
-  {
-    jam();
-    iter.m_map_ptr = get_row_ptr_var(map.m_map_ref);
-  }
+  iter.m_map_ptr = get_row_ptr(map.m_map_ref);
   iter.m_size = map.m_size;
-  iter.m_ref.m_allocator = var;
+  iter.m_ref.m_alloc_type = map.m_rowBuffer->m_type;
 
   Uint32 pos = 0;
   while (RowMap::isNull(iter.m_map_ptr, pos) && pos < iter.m_size)
@@ -2926,20 +2828,12 @@ Dbspj::first(Ptr<Request> requestPtr, Pt
     jam();
     RowMap::load(iter.m_map_ptr, pos, iter.m_ref);
     iter.m_element_no = pos;
-    if (var == 0)
-    {
-      jam();
-      iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
-    }
-    else
-    {
-      jam();
-      iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
-    }
+    iter.m_row_ptr = get_row_ptr(iter.m_ref);
     return true;
   }
 }
 
+inline
 bool
 Dbspj::next(RowMapIterator & iter)
 {
@@ -2958,45 +2852,46 @@ Dbspj::next(RowMapIterator & iter)
     jam();
     RowMap::load(iter.m_map_ptr, pos, iter.m_ref);
     iter.m_element_no = pos;
-    if (iter.m_ref.m_allocator == 0)
-    {
-      jam();
-      iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
-    }
-    else
-    {
-      jam();
-      iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
-    }
+    iter.m_row_ptr = get_row_ptr(iter.m_ref);
     return true;
   }
 }
 
 bool
-Dbspj::next(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
-            RowMapIterator & iter, RowMapIteratorPtr start)
+Dbspj::first(const RowCollection& collection,
+             RowIterator& iter)
 {
-  Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
-  RowMap& map = treeNodePtr.p->m_row_map;
-  ndbrequire(!map.isNull());
-
-  if (var == 0)
+  iter.m_type = collection.m_type;
+  if (iter.m_type == RowCollection::COLLECTION_LIST)
   {
     jam();
-    iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref);
+    return first(collection.m_list, iter.m_list);
   }
   else
   {
     jam();
-    iter.m_map_ptr = get_row_ptr_var(map.m_map_ref);
+    ndbassert(iter.m_type == RowCollection::COLLECTION_MAP);
+    return first(collection.m_map, iter.m_map);
   }
-  iter.m_size = map.m_size;
+}
 
-  RowMap::load(iter.m_map_ptr, start.m_element_no, iter.m_ref);
-  iter.m_element_no = start.m_element_no;
-  return next(iter);
+bool
+Dbspj::next(RowIterator& iter)
+{
+  if (iter.m_type == RowCollection::COLLECTION_LIST)
+  {
+    jam();
+    return next(iter.m_list);
+  }
+  else
+  {
+    jam();
+    ndbassert(iter.m_type == RowCollection::COLLECTION_MAP);
+    return next(iter.m_map);
+  }
 }
 
+inline
 Uint32 *
 Dbspj::stackAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz)
 {
@@ -3025,11 +2920,12 @@ Dbspj::stackAlloc(RowBuffer & buffer, Ro
 
   dst.m_page_id = ptr.i;
   dst.m_page_pos = pos;
-  dst.m_allocator = 0;
+  dst.m_alloc_type = BUFFER_STACK;
   buffer.m_stack.m_pos = pos + sz;
   return ptr.p->m_data + pos;
 }
 
+inline
 Uint32 *
 Dbspj::varAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz)
 {
@@ -3061,11 +2957,32 @@ Dbspj::varAlloc(RowBuffer & buffer, RowR
 
   dst.m_page_id = ptr.i;
   dst.m_page_pos = pos;
-  dst.m_allocator = 1;
+  dst.m_alloc_type = BUFFER_VAR;
   buffer.m_var.m_free = vp->free_space;
   return vp->get_ptr(pos);
 }
 
+Uint32 *
+Dbspj::rowAlloc(RowBuffer& rowBuffer, RowRef& dst, Uint32 sz)
+{
+  if (rowBuffer.m_type == BUFFER_STACK)
+  {
+    jam();
+    return stackAlloc(rowBuffer, dst, sz);
+  }
+  else if (rowBuffer.m_type == BUFFER_VAR)
+  {
+    jam();
+    return varAlloc(rowBuffer, dst, sz);
+  }
+  else
+  {
+    jam();
+    ndbrequire(false);
+    return NULL;
+  }
+}
+
 bool
 Dbspj::allocPage(Ptr<RowPage> & ptr)
 {
@@ -7523,49 +7440,28 @@ Dbspj::appendFromParent(Uint32 & dst, Lo
     m_treenode_pool.getPtr(treeNodePtr, treeNodePtr.p->m_parentPtrI);
     DEBUG("appendFromParent"
        << ", node: " << treeNodePtr.p->m_node_no);
-    if (unlikely((treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) == 0))
+    if (unlikely(treeNodePtr.p->m_rows.m_type != RowCollection::COLLECTION_MAP))
     {
       DEBUG_CRASH();
       return DbspjErr::InvalidPattern;
     }
 
     RowRef ref;
-    treeNodePtr.p->m_row_map.copyto(ref);
-    Uint32 allocator = ref.m_allocator;
-    const Uint32 * mapptr;
-    if (allocator == 0)
-    {
-      jam();
-      mapptr = get_row_ptr_stack(ref);
-    }
-    else
-    {
-      jam();
-      mapptr = get_row_ptr_var(ref);
-    }
+    treeNodePtr.p->m_rows.m_map.copyto(ref);
+    const Uint32* const mapptr = get_row_ptr(ref);
 
     Uint32 pos = corrVal >> 16; // parent corr-val
-    if (unlikely(! (pos < treeNodePtr.p->m_row_map.m_size)))
+    if (unlikely(! (pos < treeNodePtr.p->m_rows.m_map.m_size)))
     {
       DEBUG_CRASH();
       return DbspjErr::InvalidPattern;
     }
 
     // load ref to parent row
-    treeNodePtr.p->m_row_map.load(mapptr, pos, ref);
+    treeNodePtr.p->m_rows.m_map.load(mapptr, pos, ref);
 
-    const Uint32 * rowptr;
-    if (allocator == 0)
-    {
-      jam();
-      rowptr = get_row_ptr_stack(ref);
-    }
-    else
-    {
-      jam();
-      rowptr = get_row_ptr_var(ref);
-    }
-    setupRowPtr(treeNodePtr, targetRow, ref, rowptr);
+    const Uint32* const rowptr = get_row_ptr(ref);
+    setupRowPtr(treeNodePtr.p->m_rows, targetRow, ref, rowptr);
 
     if (levels)
     {

=== modified file 'storage/ndb/src/mgmsrv/testConfig.cpp'
--- a/storage/ndb/src/mgmsrv/testConfig.cpp	2011-02-02 00:40:07 +0000
+++ b/storage/ndb/src/mgmsrv/testConfig.cpp	2012-11-12 13:42:48 +0000
@@ -302,19 +302,19 @@ print_restart_info(void)
   }
 
   fprintf(stderr, "*** initial node restart ***\n");
-  for (size_t i = 0; i < initial_node.size(); i++) {
+  for (unsigned i = 0; i < initial_node.size(); i++) {
     fprintf(stderr, "%s\n", initial_node[i]);
   }
   fprintf(stderr, "\n");
 
   fprintf(stderr, "*** system restart ***\n");
-  for (size_t i = 0; i < system.size(); i++) {
+  for (unsigned i = 0; i < system.size(); i++) {
     fprintf(stderr, "%s\n", system[i]);
   }
   fprintf(stderr, "\n");
 
   fprintf(stderr, "*** initial system restart ***\n");
-  for (size_t i = 0; i < initial_system.size(); i++) {
+  for (unsigned i = 0; i < initial_system.size(); i++) {
     fprintf(stderr, "%s\n", initial_system[i]);
   }
   fprintf(stderr, "\n");

=== modified file 'storage/ndb/test/ndbapi/testInterpreter.cpp'
--- a/storage/ndb/test/ndbapi/testInterpreter.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/test/ndbapi/testInterpreter.cpp	2012-11-12 13:26:01 +0000
@@ -209,9 +209,10 @@ int runTestBug19537(NDBT_Context* ctx, N
   // Load 64-bit constant into register 1 and
   // write from register 1 to 32-bit column KOL2
   const Uint64 reg_val = 0x0102030405060708ULL;
+#if 0
   Uint32 reg_ptr32[2];
-  memcpy(reg_ptr32+0, (Uint8*)&reg_val, sizeof(Uint32));
-  memcpy(reg_ptr32+1, ((Uint8*)&reg_val)+4, sizeof(Uint32));
+  memcpy(&(reg_ptr32[0]), (Uint8*)&reg_val, sizeof(Uint32));
+  memcpy(&(reg_ptr32[1]), ((Uint8*)&reg_val)+4, sizeof(Uint32));
   if (reg_ptr32[0] == 0x05060708 && reg_ptr32[1] == 0x01020304) {
     g_err << "runTestBug19537: platform is LITTLE endian" << endl;
   } else if (reg_ptr32[0] == 0x01020304 && reg_ptr32[1] == 0x05060708) {
@@ -222,6 +223,7 @@ int runTestBug19537(NDBT_Context* ctx, N
     pNdb->closeTransaction(pTrans);
     return NDBT_FAILED;
   }
+#endif
 
   if (pOp->load_const_u64(1, reg_val) == -1 ||
       pOp->write_attr("KOL2", 1) == -1) {

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-trunk-cluster branch (magnus.blaudd:3567 to 3568) magnus.blaudd13 Nov