List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:November 9 2011 1:38pm
Subject:bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch
(ole.john.aske:3580 to 3581)
View as plain text  
 3581 Ole John Aske	2011-11-09 [merge]
      Merge telco-7.0 -> telco-7.0-spj-scan-scan

    modified:
      mysql-test/suite/ndb/r/ndb_index_stat.result
      sql/ha_ndb_index_stat.cc
      storage/ndb/include/ndbapi/NdbReceiver.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbReceiver.cpp
      storage/ndb/src/ndbapi/NdbRecord.hpp
      storage/ndb/src/ndbapi/NdbScanOperation.cpp
 3580 Ole John Aske	2011-11-09 [merge]
      Merge telco-7.0 -> telco-7.0-spj-scan-scan

    added:
      mysql-test/suite/ndb/r/ndb_multi_update_delete.result
      mysql-test/suite/ndb/t/ndb_multi_update_delete.test
    modified:
      configure.in
      mysql-test/suite/ndb/r/ndb_condition_pushdown.result
      mysql-test/suite/ndb/t/ndb_condition_pushdown.test
      sql/ha_ndbcluster_cond.cc
      sql/ha_ndbcluster_cond.h
      sql/sql_select.cc
      storage/ndb/ndb_configure.m4
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/kernel/vm/Rope.cpp
      storage/ndb/src/kernel/vm/Rope.hpp
=== modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result'
--- a/mysql-test/suite/ndb/r/ndb_index_stat.result	2011-10-13 20:08:25 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_stat.result	2011-11-09 13:38:13 +0000
@@ -21,18 +21,18 @@ Variable_name	Value
 ndb_index_stat_enable	ON
 show global variables like 'ndb_index_stat_option';
 Variable_name	Value
-ndb_index_stat_option	loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90
+ndb_index_stat_option	loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90,zero_total=0
 set @save_option = @@global.ndb_index_stat_option;
 set @@global.ndb_index_stat_option = 'loop_idle=3333,cache_limit=44M';
 set @@global.ndb_index_stat_option = 'cache_lowpct=85,evict_delay=55';
 set @@global.ndb_index_stat_option = 'check_delay=234s';
 show global variables like 'ndb_index_stat_option';
 Variable_name	Value
-ndb_index_stat_option	loop_enable=1000ms,loop_idle=3333ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=234s,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=55s,cache_limit=44M,cache_lowpct=85
+ndb_index_stat_option	loop_enable=1000ms,loop_idle=3333ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=234s,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=55s,cache_limit=44M,cache_lowpct=85,zero_total=0
 set @@global.ndb_index_stat_option = @save_option;
 show global variables like 'ndb_index_stat_option';
 Variable_name	Value
-ndb_index_stat_option	loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90
+ndb_index_stat_option	loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90,zero_total=0
 create table t1 (
 a1 int unsigned not null,
 b1 int unsigned not null,

=== modified file 'sql/ha_ndb_index_stat.cc'
--- a/sql/ha_ndb_index_stat.cc	2011-10-20 16:18:28 +0000
+++ b/sql/ha_ndb_index_stat.cc	2011-11-08 21:43:36 +0000
@@ -75,6 +75,7 @@ struct Ndb_index_stat {
   struct Ndb_index_stat *list_next;
   struct Ndb_index_stat *list_prev;
   struct NDB_SHARE *share;
+  bool to_delete;       /* detached from share and marked for delete */
   Ndb_index_stat();
 };
 
@@ -134,7 +135,8 @@ struct Ndb_index_stat_opt {
     Umsec = 4
   };
   enum Flag {
-    Freadonly = (1 << 0)
+    Freadonly = (1 << 0),
+    Fcontrol = (1 << 1)
   };
   struct Val {
     const char* name;
@@ -161,7 +163,8 @@ struct Ndb_index_stat_opt {
     Ievict_delay = 13,
     Icache_limit = 14,
     Icache_lowpct = 15,
-    Imax = 16
+    Izero_total = 16,
+    Imax = 17
   };
   Val val[Imax];
   /* Options in string format (SYSVAR ndb_index_stat_option) */
@@ -171,6 +174,10 @@ struct Ndb_index_stat_opt {
     assert(i < Imax);
     return val[i].val;
   }
+  void set(Idx i, uint the_val) {
+    assert(i < Imax);
+    val[i].val = the_val;
+  }
 };
 
 Ndb_index_stat_opt::Ndb_index_stat_opt(char* buf) :
@@ -197,8 +204,9 @@ Ndb_index_stat_opt::Ndb_index_stat_opt(c
   ival(error_delay, 60, 0, ~0, Utime, 0);
   ival(evict_batch, 8, 1, ~0, Usize, 0);
   ival(evict_delay, 60, 0, ~0, Utime, 0);
-  ival(cache_limit, 32*1024*1024, 1024*1024, ~0, Usize, 0);
+  ival(cache_limit, 32*1024*1024, 0, ~0, Usize, 0);
   ival(cache_lowpct, 90, 0, 100, Usize, 0);
+  ival(zero_total, 0, 0, 1, Ubool, Fcontrol);
 #undef ival
 
   ndb_index_stat_opt2str(*this, option);
@@ -234,9 +242,9 @@ ndb_index_stat_opt2str(const Ndb_index_s
       {
         DBUG_ASSERT(v.val == 0 || v.val == 1);
         if (v.val == 0)
-          my_snprintf(ptr, sz, "%s%s=OFF", sep, v.name);
+          my_snprintf(ptr, sz, "%s%s=0", sep, v.name);
         else
-          my_snprintf(ptr, sz, "%s%s=ON", sep, v.name);
+          my_snprintf(ptr, sz, "%s%s=1", sep, v.name);
       }
       break;
 
@@ -308,12 +316,14 @@ ndb_index_stat_option_parse(char* p, Ndb
   if (*r == 0)
     DBUG_RETURN(-1);
 
+  bool found= false;
   const uint imax= Ndb_index_stat_opt::Imax;
   for (uint i= 0; i < imax; i++)
   {
     Ndb_index_stat_opt::Val& v= opt.val[i];
     if (strcmp(p, v.name) != 0)
       continue;
+    found= true;
 
     char *s;
     for (s= r; *s != 0; s++)
@@ -400,6 +410,9 @@ ndb_index_stat_option_parse(char* p, Ndb
       break;
     }
   }
+
+  if (!found)
+    DBUG_RETURN(-1);
   DBUG_RETURN(0);
 }
 
@@ -503,15 +516,23 @@ struct Ndb_index_stat_glob {
   uint wait_update;
   uint no_stats;
   uint wait_stats;
+  /* Accumulating counters */
+  uint analyze_count;
+  uint analyze_error;
+  uint query_count;
+  uint query_no_stats;
+  uint query_error;
   uint event_ok;          /* Events received for known index */
   uint event_miss;        /* Events received for unknown index */
-  char status[2][512];
-  uint status_i;
+  /* Cache */
   uint cache_query_bytes; /* In use */
   uint cache_clean_bytes; /* Obsolete versions not yet removed */
+  char status[2][512];
+  uint status_i;
 
   Ndb_index_stat_glob();
   void set_status();
+  void zero_total();
 };
 
 Ndb_index_stat_glob::Ndb_index_stat_glob()
@@ -524,12 +545,17 @@ Ndb_index_stat_glob::Ndb_index_stat_glob
   wait_update= 0;
   no_stats= 0;
   wait_stats= 0;
+  analyze_count= 0;
+  analyze_error= 0;
+  query_count= 0;
+  query_no_stats= 0;
+  query_error= 0;
   event_ok= 0;
   event_miss= 0;
-  memset(status, 0, sizeof(status));
-  status_i= 0;
   cache_query_bytes= 0;
   cache_clean_bytes= 0;
+  memset(status, 0, sizeof(status));
+  status_i= 0;
 }
 
 /* Update status variable (must hold stat_mutex) */
@@ -541,7 +567,7 @@ Ndb_index_stat_glob::set_status()
 
   // stats thread
   th_allow= ndb_index_stat_allow();
-  sprintf(p, "allow:%d,enable:%d,busy:%d,loop:%ums",
+  sprintf(p, "allow:%d,enable:%d,busy:%d,loop:%u",
              th_allow, th_enable, th_busy, th_loop);
   p+= strlen(p);
 
@@ -562,11 +588,19 @@ Ndb_index_stat_glob::set_status()
   // special counters
   sprintf(p, ",analyze:(queue:%u,wait:%u)", force_update, wait_update);
   p+= strlen(p);
-  sprintf(p, ",stats:(none:%u,wait:%u)", no_stats, wait_stats);
+  sprintf(p, ",stats:(nostats:%u,wait:%u)", no_stats, wait_stats);
   p+= strlen(p);
 
-  // events
-  sprintf(p, ",events:(ok:%u,miss:%u)", event_ok, event_miss);
+  // accumulating counters
+  sprintf(p, ",total:(");
+  p+= strlen(p);
+  sprintf(p, "analyze:(all:%u,error:%u)", analyze_count, analyze_error);
+  p+= strlen(p);
+  sprintf(p, ",query:(all:%u,nostats:%u,error:%u)", query_count, query_no_stats, query_error);
+  p+= strlen(p);
+  sprintf(p, ",event:(ok:%u,miss:%u)", event_ok, event_miss);
+  p+= strlen(p);
+  sprintf(p, ")");
   p+= strlen(p);
 
   // cache size
@@ -575,7 +609,7 @@ Ndb_index_stat_glob::set_status()
   double cache_pct= (double)0.0;
   if (cache_limit != 0)
     cache_pct= (double)100.0 * (double)cache_total / (double)cache_limit;
-  sprintf(p, ",cache:(query:%u,clean:%u,total:%.2f%%)",
+  sprintf(p, ",cache:(query:%u,clean:%u,totalpct:%.2f)",
              cache_query_bytes, cache_clean_bytes, cache_pct);
   p+= strlen(p);
 
@@ -588,6 +622,19 @@ Ndb_index_stat_glob::set_status()
   pthread_mutex_unlock(&LOCK_global_system_variables);
 }
 
+/* Zero accumulating counters */
+void
+Ndb_index_stat_glob::zero_total()
+{
+  analyze_count= 0;
+  analyze_error= 0;
+  query_count= 0;
+  query_no_stats= 0;
+  query_error= 0;
+  event_ok= 0;
+  event_miss= 0;
+}
+
 Ndb_index_stat_glob ndb_index_stat_glob;
 
 /* Shared index entries */
@@ -616,6 +663,7 @@ Ndb_index_stat::Ndb_index_stat()
   list_next= 0;
   list_prev= 0;
   share= 0;
+  to_delete= false;
 }
 
 void
@@ -908,9 +956,15 @@ ndb_index_stat_get_share(NDB_SHARE *shar
   return st;
 }
 
+/*
+  Prepare to delete index stat entry.  Remove it from per-share
+  list and set "to_delete" flag.  Stats thread does real delete.
+*/
+
 void
 ndb_index_stat_free(Ndb_index_stat *st)
 {
+  DBUG_ENTER("ndb_index_stat_free");
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
   pthread_mutex_lock(&ndb_index_stat_list_mutex);
   NDB_SHARE *share= st->share;
@@ -924,10 +978,13 @@ ndb_index_stat_free(Ndb_index_stat *st)
   {
     if (st == st_loop)
     {
+      DBUG_PRINT("index_stat", ("st %s stat free one", st->id));
+      st->share_next= 0;
       st->share= 0;
       assert(st->lt != 0);
       assert(st->lt != Ndb_index_stat::LT_Delete);
-      ndb_index_stat_list_move(st, Ndb_index_stat::LT_Delete);
+      assert(!st->to_delete);
+      st->to_delete= true;
       st_loop= st_loop->share_next;
       assert(!found);
       found++;
@@ -950,26 +1007,32 @@ ndb_index_stat_free(Ndb_index_stat *st)
   glob.set_status();
   pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
   pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  DBUG_VOID_RETURN;
 }
 
 void
 ndb_index_stat_free(NDB_SHARE *share)
 {
+  DBUG_ENTER("ndb_index_stat_free");
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
   pthread_mutex_lock(&ndb_index_stat_list_mutex);
   Ndb_index_stat *st;
   while ((st= share->index_stat_list) != 0)
   {
+    DBUG_PRINT("index_stat", ("st %s stat free all", st->id));
     share->index_stat_list= st->share_next;
+    st->share_next= 0;
     st->share= 0;
     assert(st->lt != 0);
     assert(st->lt != Ndb_index_stat::LT_Delete);
-    ndb_index_stat_list_move(st, Ndb_index_stat::LT_Delete);
+    assert(!st->to_delete);
+    st->to_delete= true;
   }
   pthread_mutex_lock(&ndb_index_stat_stat_mutex);
   glob.set_status();
   pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
   pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  DBUG_VOID_RETURN;
 }
 
 /* Find entry across shares */
@@ -1221,9 +1284,15 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
     st->check_time == 0 ? 0 : st->check_time + check_delay - pr.now;
 
   DBUG_PRINT("index_stat", ("st %s check wait:%lds force update:%u"
-                            " clean wait:%lds cache clean:%d",
+                            " clean wait:%lds cache clean:%d to delete:%d",
                             st->id, (long)check_wait, st->force_update,
-                            (long)clean_wait, st->cache_clean));
+                            (long)clean_wait, st->cache_clean, st->to_delete));
+
+  if (st->to_delete)
+  {
+    pr.lt= Ndb_index_stat::LT_Delete;
+    return;
+  }
 
   if (!st->cache_clean && clean_wait <= 0)
   {
@@ -1493,6 +1562,13 @@ ndb_index_stat_proc_delete(Ndb_index_sta
     Ndb_index_stat *st= st_loop;
     st_loop= st_loop->list_next;
     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
+
+    // adjust global counters at drop
+    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    ndb_index_stat_force_update(st, false);
+    ndb_index_stat_no_stats(st, false);
+    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+
     ndb_index_stat_proc_evict(pr, st);
     ndb_index_stat_list_remove(st);
     delete st->is;
@@ -1514,6 +1590,12 @@ ndb_index_stat_proc_error(Ndb_index_stat
   const int error_delay= opt.get(Ndb_index_stat_opt::Ierror_delay);
   const time_t error_wait= st->error_time + error_delay - pr.now;
 
+  if (st->to_delete)
+  {
+    pr.lt= Ndb_index_stat::LT_Delete;
+    return;
+  }
+
   if (error_wait <= 0 ||
       /* Analyze issued after previous error */
       st->force_update)
@@ -1638,8 +1720,62 @@ ndb_index_stat_proc_event(Ndb_index_stat
   pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
 }
 
+/* Control options */
+
+void
+ndb_index_stat_proc_control(Ndb_index_stat_proc &pr)
+{
+  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+  Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+
+  /* Request to zero accumulating counters */
+  if (opt.get(Ndb_index_stat_opt::Izero_total) == true)
+  {
+    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    glob.zero_total();
+    glob.set_status();
+    opt.set(Ndb_index_stat_opt::Izero_total, false);
+    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  }
+}
+
 #ifndef DBUG_OFF
 void
+ndb_index_stat_entry_verify(const Ndb_index_stat *st)
+{
+  const NDB_SHARE *share= st->share;
+  if (st->to_delete)
+  {
+    assert(st->share_next == 0);
+    assert(share == 0);
+  }
+  else
+  {
+    assert(share != 0);
+    const Ndb_index_stat *st2= share->index_stat_list;
+    assert(st2 != 0);
+    uint found= 0;
+    while (st2 != 0)
+    {
+      assert(st2->share == share);
+      const Ndb_index_stat *st3= st2->share_next;
+      uint guard= 0;
+      while (st3 != 0)
+      {
+        assert(st2 != st3);
+        guard++;
+        assert(guard <= 1000); // MAX_INDEXES
+        st3= st3->share_next;
+      }
+      if (st == st2)
+        found++;
+      st2= st2->share_next;
+    }
+    assert(found == 1);
+  }
+}
+
+void
 ndb_index_stat_list_verify(int lt)
 {
   const Ndb_index_stat_list &list= ndb_index_stat_list[lt];
@@ -1684,6 +1820,7 @@ ndb_index_stat_list_verify(int lt)
       assert(guard <= list.count);
       st2= st2->list_next;
     }
+    ndb_index_stat_entry_verify(st);
     st= st->list_next;
   }
   assert(count == list.count);
@@ -1717,6 +1854,9 @@ void
 ndb_index_stat_proc(Ndb_index_stat_proc &pr)
 {
   DBUG_ENTER("ndb_index_stat_proc");
+
+  ndb_index_stat_proc_control(pr);
+
 #ifndef DBUG_OFF
   ndb_index_stat_list_verify();
   Ndb_index_stat_glob old_glob= ndb_index_stat_glob;
@@ -2161,9 +2301,15 @@ ndb_index_stat_wait(Ndb_index_stat *st,
     if (count == 0)
     {
       if (!from_analyze)
+      {
         glob.wait_stats++;
+        glob.query_count++;
+      }
       else
+      {
         glob.wait_update++;
+        glob.analyze_count++;
+      }
       if (st->lt == Ndb_index_stat::LT_Error && !from_analyze)
       {
         err= Ndb_index_stat_error_HAS_ERROR;
@@ -2175,12 +2321,17 @@ ndb_index_stat_wait(Ndb_index_stat *st,
     {
       /* Have detected no stats now or before */
       err= NdbIndexStat::NoIndexStats;
+      glob.query_no_stats++;
       break;
     }
     if (st->error.code != 0)
     {
       /* A new error has occured */
       err= st->error.code;
+      if (!from_analyze)
+        glob.query_error++;
+      else
+        glob.analyze_error++;
       break;
     }
     if (st->sample_version > sample_version)

=== modified file 'storage/ndb/include/ndbapi/NdbReceiver.hpp'
--- a/storage/ndb/include/ndbapi/NdbReceiver.hpp	2011-08-17 12:53:58 +0000
+++ b/storage/ndb/include/ndbapi/NdbReceiver.hpp	2011-11-09 13:38:13 +0000
@@ -105,16 +105,13 @@ private:
 
   static
   void calculate_batch_size(const NdbImpl&,
-                            const NdbRecord *,
-                            const NdbRecAttr *first_rec_attr,
-                            Uint32, Uint32, Uint32&, Uint32&, Uint32&);
-
-  void calculate_batch_size(Uint32 key_size,
                             Uint32 parallelism,
                             Uint32& batch_size,
-                            Uint32& batch_byte_size,
-                            Uint32& first_batch_size,
-                            const NdbRecord *rec) const;
+                            Uint32& batch_byte_size);
+
+  void calculate_batch_size(Uint32 parallelism,
+                            Uint32& batch_size,
+                            Uint32& batch_byte_size) const;
 
   /*
     Set up buffers for receiving TRANSID_AI and KEYINFO20 signals

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-11-09 09:53:04 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-11-09 13:38:13 +0000
@@ -5215,6 +5215,7 @@ Dbspj::scanIndex_send(Signal* signal,
   jam();
   ndbassert(bs_bytes > 0);
   ndbassert(bs_rows > 0);
+  ndbassert(bs_rows <= bs_bytes);
   /**
    * if (m_bits & prunemask):
    * - Range keys sliced out to each ScanFragHandle
@@ -5556,37 +5557,43 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
         org->batch_size_rows / data.m_parallelism * (data.m_parallelism - 1)
         + data.m_totalRows;
       
-      // Number of rows that we can still fetch in this batch.
+      // Number of rows & bytes that we can still fetch in this batch.
       const Int32 remainingRows 
         = static_cast<Int32>(org->batch_size_rows - maxCorrVal);
-      
+      const Int32 remainingBytes 
+        = static_cast<Int32>(org->batch_size_bytes - data.m_totalBytes);
+
       if (remainingRows >= data.m_frags_not_started &&
+          remainingBytes >= data.m_frags_not_started &&
           /**
            * Check that (remaning row capacity)/(remaining fragments) is 
            * greater or equal to (rows read so far)/(finished fragments).
            */
           remainingRows * static_cast<Int32>(data.m_parallelism) >=
-          static_cast<Int32>(data.m_totalRows * data.m_frags_not_started) &&
-          (org->batch_size_bytes - data.m_totalBytes) * data.m_parallelism >=
-          data.m_totalBytes * data.m_frags_not_started)
+            static_cast<Int32>(data.m_totalRows * data.m_frags_not_started) &&
+          remainingBytes * static_cast<Int32>(data.m_parallelism) >=
+            static_cast<Int32>(data.m_totalBytes * data.m_frags_not_started))
       {
         jam();
         Uint32 batchRange = maxCorrVal;
+        Uint32 bs_rows  = remainingRows / data.m_frags_not_started;
+        Uint32 bs_bytes = remainingBytes / data.m_frags_not_started;
+
         DEBUG("::scanIndex_execSCAN_FRAGCONF() first batch was not full."
               " Asking for new batches from " << data.m_frags_not_started <<
               " fragments with " << 
-              remainingRows / data.m_frags_not_started 
-              <<" rows and " << 
-              (org->batch_size_bytes - data.m_totalBytes)
-              / data.m_frags_not_started 
-              << " bytes.");
+              bs_rows  <<" rows and " << 
+              bs_bytes << " bytes.");
+
+        if (unlikely(bs_rows > bs_bytes))
+          bs_rows = bs_bytes;
+
         scanIndex_send(signal,
                        requestPtr,
                        treeNodePtr,
                        data.m_frags_not_started,
-                       (org->batch_size_bytes - data.m_totalBytes)
-                       / data.m_frags_not_started,
-                       remainingRows / data.m_frags_not_started,
+                       bs_bytes,
+                       bs_rows,
                        batchRange);
         return;
       }

=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2011-10-21 08:59:23 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2011-11-09 13:10:53 +0000
@@ -6795,8 +6795,6 @@ NdbDictionaryImpl::initialiseColumnData(
   recCol->orgAttrSize= col->m_orgAttrSize;
   if (recCol->offset+recCol->maxSize > rec->m_row_size)
     rec->m_row_size= recCol->offset+recCol->maxSize;
-  /* Round data size to whole words + 4 bytes of AttributeHeader. */
-  rec->m_max_transid_ai_bytes+= (recCol->maxSize+7) & ~3;
   recCol->charset_info= col->m_cs;
   recCol->compare_function= NdbSqlUtil::getType(col->m_type).m_cmp;
   recCol->flags= 0;
@@ -6985,7 +6983,6 @@ NdbDictionaryImpl::createRecord(const Nd
   }
 
   rec->m_row_size= 0;
-  rec->m_max_transid_ai_bytes= 0;
   for (i= 0; i<length; i++)
   {
     const NdbDictionary::RecordSpecification *rs= &recSpec[i];

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-09-19 08:13:58 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-11-08 21:37:30 +0000
@@ -878,6 +878,11 @@ NdbIndexStatImpl::sys_read_head(Con& con
     return -1;
   if (sys_head_getvalue(con) == -1)
     return -1;
+  if (con.m_op->setAbortOption(NdbOperation::AbortOnError) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
   if (con.execute(commit) == -1)
   {
     setError(con, __LINE__);

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-10-28 13:45:34 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-11-09 13:38:13 +0000
@@ -3058,20 +3058,16 @@ NdbQueryImpl::doSend(int nodeId, bool la
     scanTabReq->transId2 = (Uint32) (transId >> 32);
 
     Uint32 batchRows = root.getMaxBatchRows();
-    Uint32 batchByteSize, firstBatchRows;
+    Uint32 batchByteSize;
     NdbReceiver::calculate_batch_size(* ndb.theImpl,
-                                      root.m_ndbRecord,
-                                      root.m_firstRecAttr,
-                                      0, // Key size.
                                       getRootFragCount(),
                                       batchRows,
-                                      batchByteSize,
-                                      firstBatchRows);
+                                      batchByteSize);
     assert(batchRows==root.getMaxBatchRows());
-    assert(batchRows==firstBatchRows);
+    assert(batchRows<=batchByteSize);
     ScanTabReq::setScanBatch(reqInfo, batchRows);
     scanTabReq->batch_byte_size = batchByteSize;
-    scanTabReq->first_batch_size = firstBatchRows;
+    scanTabReq->first_batch_size = batchRows;
 
     ScanTabReq::setViaSPJFlag(reqInfo, 1);
     ScanTabReq::setPassAllConfsFlag(reqInfo, 1);
@@ -4361,11 +4357,11 @@ NdbQueryOperationImpl
      * We must thus make sure that we do not set a batch size for the scan 
      * that exceeds what any of its scan descendants can use.
      *
-     * Ignore calculated 'batchByteSize' and 'firstBatchRows' 
+     * Ignore calculated 'batchByteSize' 
      * here - Recalculated when building signal after max-batchRows has been 
      * determined.
      */
-    Uint32 batchByteSize, firstBatchRows;
+    Uint32 batchByteSize;
     /**
      * myClosestScan->m_maxBatchRows may be zero to indicate that we
      * should use default values, or non-zero if the application had an 
@@ -4373,18 +4369,14 @@ NdbQueryOperationImpl
      */
     maxBatchRows = myClosestScan->m_maxBatchRows;
     NdbReceiver::calculate_batch_size(* ndb.theImpl,
-                                      m_ndbRecord,
-                                      m_firstRecAttr,
-                                      0, // Key size.
                                       getRoot().m_parallelism
-                                      == Parallelism_max ?
-                                      m_queryImpl.getRootFragCount() :
-                                      getRoot().m_parallelism,
+                                      == Parallelism_max
+                                      ? m_queryImpl.getRootFragCount()
+                                      : getRoot().m_parallelism,
                                       maxBatchRows,
-                                      batchByteSize,
-                                      firstBatchRows);
+                                      batchByteSize);
     assert(maxBatchRows > 0);
-    assert(firstBatchRows == maxBatchRows);
+    assert(maxBatchRows <= batchByteSize);
   }
 
   // Find the largest value that is acceptable to all lookup descendants.
@@ -4554,17 +4546,13 @@ NdbQueryOperationImpl::prepareAttrInfo(U
     Ndb& ndb = *m_queryImpl.getNdbTransaction().getNdb();
 
     Uint32 batchRows = getMaxBatchRows();
-    Uint32 batchByteSize, firstBatchRows;
+    Uint32 batchByteSize;
     NdbReceiver::calculate_batch_size(* ndb.theImpl,
-                                      m_ndbRecord,
-                                      m_firstRecAttr,
-                                      0, // Key size.
                                       m_queryImpl.getRootFragCount(),
                                       batchRows,
-                                      batchByteSize,
-                                      firstBatchRows);
-    assert(batchRows == firstBatchRows);
+                                      batchByteSize);
     assert(batchRows == getMaxBatchRows());
+    assert(batchRows <= batchByteSize);
     assert(m_parallelism == Parallelism_max ||
            m_parallelism == Parallelism_adaptive);
     if (m_parallelism == Parallelism_max)

=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp	2011-08-17 12:53:58 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp	2011-11-09 13:38:13 +0000
@@ -155,88 +155,57 @@ NdbReceiver::prepareRead(char *buf, Uint
   Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
   use, taking into account limits in the transporter, user preference, etc.
 
-  Hm, there are some magic overhead numbers (4 bytes/attr, 32 bytes/row) here,
-  would be nice with some explanation on how these numbers were derived.
+  It is the responsibility of the batch producer (LQH+TUP) to
+  stay within these 'batch_size' and 'batch_byte_size' limits.:
 
-  TODO : Check whether these numbers need to be revised w.r.t. read packed
+  - It should stay strictly within the 'batch_size' (#rows) limit.
+  - It is allowed to overallocate the 'batch_byte_size' (slightly)
+    in order to complete the current row when it hit the limit.
+
+  The client should be prepared to receive, and buffer, upto 
+  'batch_size' rows from each fragment.
+  ::ndbrecord_rowsize() might be usefull for calculating the
+  buffersize to allocate for this resultset.
 */
 //static
 void
 NdbReceiver::calculate_batch_size(const NdbImpl& theImpl,
-                                  const NdbRecord *record,
-                                  const NdbRecAttr *first_rec_attr,
-                                  Uint32 key_size,
                                   Uint32 parallelism,
                                   Uint32& batch_size,
-                                  Uint32& batch_byte_size,
-                                  Uint32& first_batch_size)
+                                  Uint32& batch_byte_size)
 {
   const NdbApiConfig & cfg = theImpl.get_ndbapi_config_parameters();
   const Uint32 max_scan_batch_size= cfg.m_scan_batch_size;
   const Uint32 max_batch_byte_size= cfg.m_batch_byte_size;
   const Uint32 max_batch_size= cfg.m_batch_size;
 
-  Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
-  if (record)
-  {
-    tot_size+= record->m_max_transid_ai_bytes;
-  }
-
-  const NdbRecAttr *rec_attr= first_rec_attr;
-  while (rec_attr != NULL) {
-    Uint32 attr_size= rec_attr->getColumn()->getSizeInBytes();
-    attr_size= ((attr_size + 4 + 3) >> 2) << 2; //Even to word + overhead
-    tot_size+= attr_size;
-    rec_attr= rec_attr->next();
+  batch_byte_size= max_batch_byte_size;
+  if (batch_byte_size * parallelism > max_scan_batch_size) {
+    batch_byte_size= max_scan_batch_size / parallelism;
   }
 
-  tot_size+= 32; //include signal overhead
-
-  /**
-   * Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
-   * bytes sent for each batch from each node. We do however ensure that
-   * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
-   * batch.
-   */
-  if (batch_size == 0)
-  {
-    batch_byte_size= max_batch_byte_size;
+  if (batch_size == 0 || batch_size > max_batch_size) {
+    batch_size= max_batch_size;
   }
-  else
-  {
-    batch_byte_size= batch_size * tot_size;
+  if (unlikely(batch_size > MAX_PARALLEL_OP_PER_SCAN)) {
+    batch_size= MAX_PARALLEL_OP_PER_SCAN;
   }
-  
-  if (batch_byte_size * parallelism > max_scan_batch_size) {
-    batch_byte_size= max_scan_batch_size / parallelism;
+  if (unlikely(batch_size > batch_byte_size)) {
+    batch_size= batch_byte_size;
   }
-  batch_size= batch_byte_size / tot_size;
-  if (batch_size == 0) {
-    batch_size= 1;
-  } else {
-    if (batch_size > max_batch_size) {
-      batch_size= max_batch_size;
-    } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
-      batch_size= MAX_PARALLEL_OP_PER_SCAN;
-    }
-  }
-  first_batch_size= batch_size;
+
   return;
 }
 
 void
-NdbReceiver::calculate_batch_size(Uint32 key_size,
-                                  Uint32 parallelism,
+NdbReceiver::calculate_batch_size(Uint32 parallelism,
                                   Uint32& batch_size,
-                                  Uint32& batch_byte_size,
-                                  Uint32& first_batch_size,
-                                  const NdbRecord *record) const
+                                  Uint32& batch_byte_size) const
 {
   calculate_batch_size(* m_ndb->theImpl,
-                       record,
-                       theFirstRecAttr,
-                       key_size, parallelism, batch_size, batch_byte_size,
-                       first_batch_size);
+                       parallelism,
+                       batch_size,
+                       batch_byte_size);
 }
 
 void

=== modified file 'storage/ndb/src/ndbapi/NdbRecord.hpp'
--- a/storage/ndb/src/ndbapi/NdbRecord.hpp	2011-07-09 11:16:31 +0000
+++ b/storage/ndb/src/ndbapi/NdbRecord.hpp	2011-11-09 13:38:13 +0000
@@ -189,8 +189,6 @@ public:
   Uint32 tableVersion;
   /* Copy of table->m_keyLenInWords. */
   Uint32 m_keyLenInWords;
-  /* Total maximum size of TRANSID_AI data (for computing batch size). */
-  Uint32 m_max_transid_ai_bytes;
   /**
    * Number of distribution keys (usually == number of primary keys).
    *

=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2011-05-20 05:54:20 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2011-11-09 13:38:13 +0000
@@ -2284,16 +2284,13 @@ int NdbScanOperation::prepareSendScan(Ui
    */
   ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
   Uint32 batch_size = req->first_batch_size; // User specified
-  Uint32 batch_byte_size, first_batch_size;
-  theReceiver.calculate_batch_size(key_size,
-                                   theParallelism,
+  Uint32 batch_byte_size;
+  theReceiver.calculate_batch_size(theParallelism,
                                    batch_size,
-                                   batch_byte_size,
-                                   first_batch_size,
-                                   m_attribute_record);
+                                   batch_byte_size);
   ScanTabReq::setScanBatch(req->requestInfo, batch_size);
   req->batch_byte_size= batch_byte_size;
-  req->first_batch_size= first_batch_size;
+  req->first_batch_size= batch_size;
 
   /**
    * Set keyinfo, nodisk and distribution key flags in 

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3580 to 3581) Ole John Aske11 Nov