3581 Ole John Aske 2011-11-09 [merge]
Merge telco-7.0 -> telco-7.0-spj-scan-scan
modified:
mysql-test/suite/ndb/r/ndb_index_stat.result
sql/ha_ndb_index_stat.cc
storage/ndb/include/ndbapi/NdbReceiver.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbReceiver.cpp
storage/ndb/src/ndbapi/NdbRecord.hpp
storage/ndb/src/ndbapi/NdbScanOperation.cpp
3580 Ole John Aske 2011-11-09 [merge]
Merge telco-7.0 -> telco-7.0-spj-scan-scan
added:
mysql-test/suite/ndb/r/ndb_multi_update_delete.result
mysql-test/suite/ndb/t/ndb_multi_update_delete.test
modified:
configure.in
mysql-test/suite/ndb/r/ndb_condition_pushdown.result
mysql-test/suite/ndb/t/ndb_condition_pushdown.test
sql/ha_ndbcluster_cond.cc
sql/ha_ndbcluster_cond.h
sql/sql_select.cc
storage/ndb/ndb_configure.m4
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/kernel/vm/Rope.cpp
storage/ndb/src/kernel/vm/Rope.hpp
=== modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result'
--- a/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-10-13 20:08:25 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-11-09 13:38:13 +0000
@@ -21,18 +21,18 @@ Variable_name Value
ndb_index_stat_enable ON
show global variables like 'ndb_index_stat_option';
Variable_name Value
-ndb_index_stat_option loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90
+ndb_index_stat_option loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90,zero_total=0
set @save_option = @@global.ndb_index_stat_option;
set @@global.ndb_index_stat_option = 'loop_idle=3333,cache_limit=44M';
set @@global.ndb_index_stat_option = 'cache_lowpct=85,evict_delay=55';
set @@global.ndb_index_stat_option = 'check_delay=234s';
show global variables like 'ndb_index_stat_option';
Variable_name Value
-ndb_index_stat_option loop_enable=1000ms,loop_idle=3333ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=234s,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=55s,cache_limit=44M,cache_lowpct=85
+ndb_index_stat_option loop_enable=1000ms,loop_idle=3333ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=234s,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=55s,cache_limit=44M,cache_lowpct=85,zero_total=0
set @@global.ndb_index_stat_option = @save_option;
show global variables like 'ndb_index_stat_option';
Variable_name Value
-ndb_index_stat_option loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90
+ndb_index_stat_option loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90,zero_total=0
create table t1 (
a1 int unsigned not null,
b1 int unsigned not null,
=== modified file 'sql/ha_ndb_index_stat.cc'
--- a/sql/ha_ndb_index_stat.cc 2011-10-20 16:18:28 +0000
+++ b/sql/ha_ndb_index_stat.cc 2011-11-08 21:43:36 +0000
@@ -75,6 +75,7 @@ struct Ndb_index_stat {
struct Ndb_index_stat *list_next;
struct Ndb_index_stat *list_prev;
struct NDB_SHARE *share;
+ bool to_delete; /* detached from share and marked for delete */
Ndb_index_stat();
};
@@ -134,7 +135,8 @@ struct Ndb_index_stat_opt {
Umsec = 4
};
enum Flag {
- Freadonly = (1 << 0)
+ Freadonly = (1 << 0),
+ Fcontrol = (1 << 1)
};
struct Val {
const char* name;
@@ -161,7 +163,8 @@ struct Ndb_index_stat_opt {
Ievict_delay = 13,
Icache_limit = 14,
Icache_lowpct = 15,
- Imax = 16
+ Izero_total = 16,
+ Imax = 17
};
Val val[Imax];
/* Options in string format (SYSVAR ndb_index_stat_option) */
@@ -171,6 +174,10 @@ struct Ndb_index_stat_opt {
assert(i < Imax);
return val[i].val;
}
+ void set(Idx i, uint the_val) {
+ assert(i < Imax);
+ val[i].val = the_val;
+ }
};
Ndb_index_stat_opt::Ndb_index_stat_opt(char* buf) :
@@ -197,8 +204,9 @@ Ndb_index_stat_opt::Ndb_index_stat_opt(c
ival(error_delay, 60, 0, ~0, Utime, 0);
ival(evict_batch, 8, 1, ~0, Usize, 0);
ival(evict_delay, 60, 0, ~0, Utime, 0);
- ival(cache_limit, 32*1024*1024, 1024*1024, ~0, Usize, 0);
+ ival(cache_limit, 32*1024*1024, 0, ~0, Usize, 0);
ival(cache_lowpct, 90, 0, 100, Usize, 0);
+ ival(zero_total, 0, 0, 1, Ubool, Fcontrol);
#undef ival
ndb_index_stat_opt2str(*this, option);
@@ -234,9 +242,9 @@ ndb_index_stat_opt2str(const Ndb_index_s
{
DBUG_ASSERT(v.val == 0 || v.val == 1);
if (v.val == 0)
- my_snprintf(ptr, sz, "%s%s=OFF", sep, v.name);
+ my_snprintf(ptr, sz, "%s%s=0", sep, v.name);
else
- my_snprintf(ptr, sz, "%s%s=ON", sep, v.name);
+ my_snprintf(ptr, sz, "%s%s=1", sep, v.name);
}
break;
@@ -308,12 +316,14 @@ ndb_index_stat_option_parse(char* p, Ndb
if (*r == 0)
DBUG_RETURN(-1);
+ bool found= false;
const uint imax= Ndb_index_stat_opt::Imax;
for (uint i= 0; i < imax; i++)
{
Ndb_index_stat_opt::Val& v= opt.val[i];
if (strcmp(p, v.name) != 0)
continue;
+ found= true;
char *s;
for (s= r; *s != 0; s++)
@@ -400,6 +410,9 @@ ndb_index_stat_option_parse(char* p, Ndb
break;
}
}
+
+ if (!found)
+ DBUG_RETURN(-1);
DBUG_RETURN(0);
}
@@ -503,15 +516,23 @@ struct Ndb_index_stat_glob {
uint wait_update;
uint no_stats;
uint wait_stats;
+ /* Accumulating counters */
+ uint analyze_count;
+ uint analyze_error;
+ uint query_count;
+ uint query_no_stats;
+ uint query_error;
uint event_ok; /* Events received for known index */
uint event_miss; /* Events received for unknown index */
- char status[2][512];
- uint status_i;
+ /* Cache */
uint cache_query_bytes; /* In use */
uint cache_clean_bytes; /* Obsolete versions not yet removed */
+ char status[2][512];
+ uint status_i;
Ndb_index_stat_glob();
void set_status();
+ void zero_total();
};
Ndb_index_stat_glob::Ndb_index_stat_glob()
@@ -524,12 +545,17 @@ Ndb_index_stat_glob::Ndb_index_stat_glob
wait_update= 0;
no_stats= 0;
wait_stats= 0;
+ analyze_count= 0;
+ analyze_error= 0;
+ query_count= 0;
+ query_no_stats= 0;
+ query_error= 0;
event_ok= 0;
event_miss= 0;
- memset(status, 0, sizeof(status));
- status_i= 0;
cache_query_bytes= 0;
cache_clean_bytes= 0;
+ memset(status, 0, sizeof(status));
+ status_i= 0;
}
/* Update status variable (must hold stat_mutex) */
@@ -541,7 +567,7 @@ Ndb_index_stat_glob::set_status()
// stats thread
th_allow= ndb_index_stat_allow();
- sprintf(p, "allow:%d,enable:%d,busy:%d,loop:%ums",
+ sprintf(p, "allow:%d,enable:%d,busy:%d,loop:%u",
th_allow, th_enable, th_busy, th_loop);
p+= strlen(p);
@@ -562,11 +588,19 @@ Ndb_index_stat_glob::set_status()
// special counters
sprintf(p, ",analyze:(queue:%u,wait:%u)", force_update, wait_update);
p+= strlen(p);
- sprintf(p, ",stats:(none:%u,wait:%u)", no_stats, wait_stats);
+ sprintf(p, ",stats:(nostats:%u,wait:%u)", no_stats, wait_stats);
p+= strlen(p);
- // events
- sprintf(p, ",events:(ok:%u,miss:%u)", event_ok, event_miss);
+ // accumulating counters
+ sprintf(p, ",total:(");
+ p+= strlen(p);
+ sprintf(p, "analyze:(all:%u,error:%u)", analyze_count, analyze_error);
+ p+= strlen(p);
+ sprintf(p, ",query:(all:%u,nostats:%u,error:%u)", query_count, query_no_stats, query_error);
+ p+= strlen(p);
+ sprintf(p, ",event:(ok:%u,miss:%u)", event_ok, event_miss);
+ p+= strlen(p);
+ sprintf(p, ")");
p+= strlen(p);
// cache size
@@ -575,7 +609,7 @@ Ndb_index_stat_glob::set_status()
double cache_pct= (double)0.0;
if (cache_limit != 0)
cache_pct= (double)100.0 * (double)cache_total / (double)cache_limit;
- sprintf(p, ",cache:(query:%u,clean:%u,total:%.2f%%)",
+ sprintf(p, ",cache:(query:%u,clean:%u,totalpct:%.2f)",
cache_query_bytes, cache_clean_bytes, cache_pct);
p+= strlen(p);
@@ -588,6 +622,19 @@ Ndb_index_stat_glob::set_status()
pthread_mutex_unlock(&LOCK_global_system_variables);
}
+/* Zero accumulating counters */
+void
+Ndb_index_stat_glob::zero_total()
+{
+ analyze_count= 0;
+ analyze_error= 0;
+ query_count= 0;
+ query_no_stats= 0;
+ query_error= 0;
+ event_ok= 0;
+ event_miss= 0;
+}
+
Ndb_index_stat_glob ndb_index_stat_glob;
/* Shared index entries */
@@ -616,6 +663,7 @@ Ndb_index_stat::Ndb_index_stat()
list_next= 0;
list_prev= 0;
share= 0;
+ to_delete= false;
}
void
@@ -908,9 +956,15 @@ ndb_index_stat_get_share(NDB_SHARE *shar
return st;
}
+/*
+ Prepare to delete index stat entry. Remove it from per-share
+ list and set "to_delete" flag. Stats thread does real delete.
+*/
+
void
ndb_index_stat_free(Ndb_index_stat *st)
{
+ DBUG_ENTER("ndb_index_stat_free");
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
pthread_mutex_lock(&ndb_index_stat_list_mutex);
NDB_SHARE *share= st->share;
@@ -924,10 +978,13 @@ ndb_index_stat_free(Ndb_index_stat *st)
{
if (st == st_loop)
{
+ DBUG_PRINT("index_stat", ("st %s stat free one", st->id));
+ st->share_next= 0;
st->share= 0;
assert(st->lt != 0);
assert(st->lt != Ndb_index_stat::LT_Delete);
- ndb_index_stat_list_move(st, Ndb_index_stat::LT_Delete);
+ assert(!st->to_delete);
+ st->to_delete= true;
st_loop= st_loop->share_next;
assert(!found);
found++;
@@ -950,26 +1007,32 @@ ndb_index_stat_free(Ndb_index_stat *st)
glob.set_status();
pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+ DBUG_VOID_RETURN;
}
void
ndb_index_stat_free(NDB_SHARE *share)
{
+ DBUG_ENTER("ndb_index_stat_free");
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
pthread_mutex_lock(&ndb_index_stat_list_mutex);
Ndb_index_stat *st;
while ((st= share->index_stat_list) != 0)
{
+ DBUG_PRINT("index_stat", ("st %s stat free all", st->id));
share->index_stat_list= st->share_next;
+ st->share_next= 0;
st->share= 0;
assert(st->lt != 0);
assert(st->lt != Ndb_index_stat::LT_Delete);
- ndb_index_stat_list_move(st, Ndb_index_stat::LT_Delete);
+ assert(!st->to_delete);
+ st->to_delete= true;
}
pthread_mutex_lock(&ndb_index_stat_stat_mutex);
glob.set_status();
pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+ DBUG_VOID_RETURN;
}
/* Find entry across shares */
@@ -1221,9 +1284,15 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
st->check_time == 0 ? 0 : st->check_time + check_delay - pr.now;
DBUG_PRINT("index_stat", ("st %s check wait:%lds force update:%u"
- " clean wait:%lds cache clean:%d",
+ " clean wait:%lds cache clean:%d to delete:%d",
st->id, (long)check_wait, st->force_update,
- (long)clean_wait, st->cache_clean));
+ (long)clean_wait, st->cache_clean, st->to_delete));
+
+ if (st->to_delete)
+ {
+ pr.lt= Ndb_index_stat::LT_Delete;
+ return;
+ }
if (!st->cache_clean && clean_wait <= 0)
{
@@ -1493,6 +1562,13 @@ ndb_index_stat_proc_delete(Ndb_index_sta
Ndb_index_stat *st= st_loop;
st_loop= st_loop->list_next;
DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
+
+ // adjust global counters at drop
+ pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ ndb_index_stat_force_update(st, false);
+ ndb_index_stat_no_stats(st, false);
+ pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+
ndb_index_stat_proc_evict(pr, st);
ndb_index_stat_list_remove(st);
delete st->is;
@@ -1514,6 +1590,12 @@ ndb_index_stat_proc_error(Ndb_index_stat
const int error_delay= opt.get(Ndb_index_stat_opt::Ierror_delay);
const time_t error_wait= st->error_time + error_delay - pr.now;
+ if (st->to_delete)
+ {
+ pr.lt= Ndb_index_stat::LT_Delete;
+ return;
+ }
+
if (error_wait <= 0 ||
/* Analyze issued after previous error */
st->force_update)
@@ -1638,8 +1720,62 @@ ndb_index_stat_proc_event(Ndb_index_stat
pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
}
+/* Control options */
+
+void
+ndb_index_stat_proc_control(Ndb_index_stat_proc &pr)
+{
+ Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+ Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+
+ /* Request to zero accumulating counters */
+ if (opt.get(Ndb_index_stat_opt::Izero_total) == true)
+ {
+ pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ glob.zero_total();
+ glob.set_status();
+ opt.set(Ndb_index_stat_opt::Izero_total, false);
+ pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ }
+}
+
#ifndef DBUG_OFF
void
+ndb_index_stat_entry_verify(const Ndb_index_stat *st)
+{
+ const NDB_SHARE *share= st->share;
+ if (st->to_delete)
+ {
+ assert(st->share_next == 0);
+ assert(share == 0);
+ }
+ else
+ {
+ assert(share != 0);
+ const Ndb_index_stat *st2= share->index_stat_list;
+ assert(st2 != 0);
+ uint found= 0;
+ while (st2 != 0)
+ {
+ assert(st2->share == share);
+ const Ndb_index_stat *st3= st2->share_next;
+ uint guard= 0;
+ while (st3 != 0)
+ {
+ assert(st2 != st3);
+ guard++;
+ assert(guard <= 1000); // MAX_INDEXES
+ st3= st3->share_next;
+ }
+ if (st == st2)
+ found++;
+ st2= st2->share_next;
+ }
+ assert(found == 1);
+ }
+}
+
+void
ndb_index_stat_list_verify(int lt)
{
const Ndb_index_stat_list &list= ndb_index_stat_list[lt];
@@ -1684,6 +1820,7 @@ ndb_index_stat_list_verify(int lt)
assert(guard <= list.count);
st2= st2->list_next;
}
+ ndb_index_stat_entry_verify(st);
st= st->list_next;
}
assert(count == list.count);
@@ -1717,6 +1854,9 @@ void
ndb_index_stat_proc(Ndb_index_stat_proc &pr)
{
DBUG_ENTER("ndb_index_stat_proc");
+
+ ndb_index_stat_proc_control(pr);
+
#ifndef DBUG_OFF
ndb_index_stat_list_verify();
Ndb_index_stat_glob old_glob= ndb_index_stat_glob;
@@ -2161,9 +2301,15 @@ ndb_index_stat_wait(Ndb_index_stat *st,
if (count == 0)
{
if (!from_analyze)
+ {
glob.wait_stats++;
+ glob.query_count++;
+ }
else
+ {
glob.wait_update++;
+ glob.analyze_count++;
+ }
if (st->lt == Ndb_index_stat::LT_Error && !from_analyze)
{
err= Ndb_index_stat_error_HAS_ERROR;
@@ -2175,12 +2321,17 @@ ndb_index_stat_wait(Ndb_index_stat *st,
{
/* Have detected no stats now or before */
err= NdbIndexStat::NoIndexStats;
+ glob.query_no_stats++;
break;
}
if (st->error.code != 0)
{
/* A new error has occured */
err= st->error.code;
+ if (!from_analyze)
+ glob.query_error++;
+ else
+ glob.analyze_error++;
break;
}
if (st->sample_version > sample_version)
=== modified file 'storage/ndb/include/ndbapi/NdbReceiver.hpp'
--- a/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-08-17 12:53:58 +0000
+++ b/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-11-09 13:38:13 +0000
@@ -105,16 +105,13 @@ private:
static
void calculate_batch_size(const NdbImpl&,
- const NdbRecord *,
- const NdbRecAttr *first_rec_attr,
- Uint32, Uint32, Uint32&, Uint32&, Uint32&);
-
- void calculate_batch_size(Uint32 key_size,
Uint32 parallelism,
Uint32& batch_size,
- Uint32& batch_byte_size,
- Uint32& first_batch_size,
- const NdbRecord *rec) const;
+ Uint32& batch_byte_size);
+
+ void calculate_batch_size(Uint32 parallelism,
+ Uint32& batch_size,
+ Uint32& batch_byte_size) const;
/*
Set up buffers for receiving TRANSID_AI and KEYINFO20 signals
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-11-09 09:53:04 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-11-09 13:38:13 +0000
@@ -5215,6 +5215,7 @@ Dbspj::scanIndex_send(Signal* signal,
jam();
ndbassert(bs_bytes > 0);
ndbassert(bs_rows > 0);
+ ndbassert(bs_rows <= bs_bytes);
/**
* if (m_bits & prunemask):
* - Range keys sliced out to each ScanFragHandle
@@ -5556,37 +5557,43 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
org->batch_size_rows / data.m_parallelism * (data.m_parallelism - 1)
+ data.m_totalRows;
- // Number of rows that we can still fetch in this batch.
+ // Number of rows & bytes that we can still fetch in this batch.
const Int32 remainingRows
= static_cast<Int32>(org->batch_size_rows - maxCorrVal);
-
+ const Int32 remainingBytes
+ = static_cast<Int32>(org->batch_size_bytes - data.m_totalBytes);
+
if (remainingRows >= data.m_frags_not_started &&
+ remainingBytes >= data.m_frags_not_started &&
/**
* Check that (remaning row capacity)/(remaining fragments) is
* greater or equal to (rows read so far)/(finished fragments).
*/
remainingRows * static_cast<Int32>(data.m_parallelism) >=
- static_cast<Int32>(data.m_totalRows * data.m_frags_not_started) &&
- (org->batch_size_bytes - data.m_totalBytes) * data.m_parallelism >=
- data.m_totalBytes * data.m_frags_not_started)
+ static_cast<Int32>(data.m_totalRows * data.m_frags_not_started) &&
+ remainingBytes * static_cast<Int32>(data.m_parallelism) >=
+ static_cast<Int32>(data.m_totalBytes * data.m_frags_not_started))
{
jam();
Uint32 batchRange = maxCorrVal;
+ Uint32 bs_rows = remainingRows / data.m_frags_not_started;
+ Uint32 bs_bytes = remainingBytes / data.m_frags_not_started;
+
DEBUG("::scanIndex_execSCAN_FRAGCONF() first batch was not full."
" Asking for new batches from " << data.m_frags_not_started <<
" fragments with " <<
- remainingRows / data.m_frags_not_started
- <<" rows and " <<
- (org->batch_size_bytes - data.m_totalBytes)
- / data.m_frags_not_started
- << " bytes.");
+ bs_rows <<" rows and " <<
+ bs_bytes << " bytes.");
+
+ if (unlikely(bs_rows > bs_bytes))
+ bs_rows = bs_bytes;
+
scanIndex_send(signal,
requestPtr,
treeNodePtr,
data.m_frags_not_started,
- (org->batch_size_bytes - data.m_totalBytes)
- / data.m_frags_not_started,
- remainingRows / data.m_frags_not_started,
+ bs_bytes,
+ bs_rows,
batchRange);
return;
}
=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2011-10-21 08:59:23 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2011-11-09 13:10:53 +0000
@@ -6795,8 +6795,6 @@ NdbDictionaryImpl::initialiseColumnData(
recCol->orgAttrSize= col->m_orgAttrSize;
if (recCol->offset+recCol->maxSize > rec->m_row_size)
rec->m_row_size= recCol->offset+recCol->maxSize;
- /* Round data size to whole words + 4 bytes of AttributeHeader. */
- rec->m_max_transid_ai_bytes+= (recCol->maxSize+7) & ~3;
recCol->charset_info= col->m_cs;
recCol->compare_function= NdbSqlUtil::getType(col->m_type).m_cmp;
recCol->flags= 0;
@@ -6985,7 +6983,6 @@ NdbDictionaryImpl::createRecord(const Nd
}
rec->m_row_size= 0;
- rec->m_max_transid_ai_bytes= 0;
for (i= 0; i<length; i++)
{
const NdbDictionary::RecordSpecification *rs= &recSpec[i];
=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-09-19 08:13:58 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-11-08 21:37:30 +0000
@@ -878,6 +878,11 @@ NdbIndexStatImpl::sys_read_head(Con& con
return -1;
if (sys_head_getvalue(con) == -1)
return -1;
+ if (con.m_op->setAbortOption(NdbOperation::AbortOnError) == -1)
+ {
+ setError(con, __LINE__);
+ return -1;
+ }
if (con.execute(commit) == -1)
{
setError(con, __LINE__);
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-10-28 13:45:34 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-11-09 13:38:13 +0000
@@ -3058,20 +3058,16 @@ NdbQueryImpl::doSend(int nodeId, bool la
scanTabReq->transId2 = (Uint32) (transId >> 32);
Uint32 batchRows = root.getMaxBatchRows();
- Uint32 batchByteSize, firstBatchRows;
+ Uint32 batchByteSize;
NdbReceiver::calculate_batch_size(* ndb.theImpl,
- root.m_ndbRecord,
- root.m_firstRecAttr,
- 0, // Key size.
getRootFragCount(),
batchRows,
- batchByteSize,
- firstBatchRows);
+ batchByteSize);
assert(batchRows==root.getMaxBatchRows());
- assert(batchRows==firstBatchRows);
+ assert(batchRows<=batchByteSize);
ScanTabReq::setScanBatch(reqInfo, batchRows);
scanTabReq->batch_byte_size = batchByteSize;
- scanTabReq->first_batch_size = firstBatchRows;
+ scanTabReq->first_batch_size = batchRows;
ScanTabReq::setViaSPJFlag(reqInfo, 1);
ScanTabReq::setPassAllConfsFlag(reqInfo, 1);
@@ -4361,11 +4357,11 @@ NdbQueryOperationImpl
* We must thus make sure that we do not set a batch size for the scan
* that exceeds what any of its scan descendants can use.
*
- * Ignore calculated 'batchByteSize' and 'firstBatchRows'
+ * Ignore calculated 'batchByteSize'
* here - Recalculated when building signal after max-batchRows has been
* determined.
*/
- Uint32 batchByteSize, firstBatchRows;
+ Uint32 batchByteSize;
/**
* myClosestScan->m_maxBatchRows may be zero to indicate that we
* should use default values, or non-zero if the application had an
@@ -4373,18 +4369,14 @@ NdbQueryOperationImpl
*/
maxBatchRows = myClosestScan->m_maxBatchRows;
NdbReceiver::calculate_batch_size(* ndb.theImpl,
- m_ndbRecord,
- m_firstRecAttr,
- 0, // Key size.
getRoot().m_parallelism
- == Parallelism_max ?
- m_queryImpl.getRootFragCount() :
- getRoot().m_parallelism,
+ == Parallelism_max
+ ? m_queryImpl.getRootFragCount()
+ : getRoot().m_parallelism,
maxBatchRows,
- batchByteSize,
- firstBatchRows);
+ batchByteSize);
assert(maxBatchRows > 0);
- assert(firstBatchRows == maxBatchRows);
+ assert(maxBatchRows <= batchByteSize);
}
// Find the largest value that is acceptable to all lookup descendants.
@@ -4554,17 +4546,13 @@ NdbQueryOperationImpl::prepareAttrInfo(U
Ndb& ndb = *m_queryImpl.getNdbTransaction().getNdb();
Uint32 batchRows = getMaxBatchRows();
- Uint32 batchByteSize, firstBatchRows;
+ Uint32 batchByteSize;
NdbReceiver::calculate_batch_size(* ndb.theImpl,
- m_ndbRecord,
- m_firstRecAttr,
- 0, // Key size.
m_queryImpl.getRootFragCount(),
batchRows,
- batchByteSize,
- firstBatchRows);
- assert(batchRows == firstBatchRows);
+ batchByteSize);
assert(batchRows == getMaxBatchRows());
+ assert(batchRows <= batchByteSize);
assert(m_parallelism == Parallelism_max ||
m_parallelism == Parallelism_adaptive);
if (m_parallelism == Parallelism_max)
=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-08-17 12:53:58 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-11-09 13:38:13 +0000
@@ -155,88 +155,57 @@ NdbReceiver::prepareRead(char *buf, Uint
Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
use, taking into account limits in the transporter, user preference, etc.
- Hm, there are some magic overhead numbers (4 bytes/attr, 32 bytes/row) here,
- would be nice with some explanation on how these numbers were derived.
+ It is the responsibility of the batch producer (LQH+TUP) to
+ stay within these 'batch_size' and 'batch_byte_size' limits.:
- TODO : Check whether these numbers need to be revised w.r.t. read packed
+ - It should stay strictly within the 'batch_size' (#rows) limit.
+ - It is allowed to overallocate the 'batch_byte_size' (slightly)
+ in order to complete the current row when it hit the limit.
+
+ The client should be prepared to receive, and buffer, upto
+ 'batch_size' rows from each fragment.
+ ::ndbrecord_rowsize() might be usefull for calculating the
+ buffersize to allocate for this resultset.
*/
//static
void
NdbReceiver::calculate_batch_size(const NdbImpl& theImpl,
- const NdbRecord *record,
- const NdbRecAttr *first_rec_attr,
- Uint32 key_size,
Uint32 parallelism,
Uint32& batch_size,
- Uint32& batch_byte_size,
- Uint32& first_batch_size)
+ Uint32& batch_byte_size)
{
const NdbApiConfig & cfg = theImpl.get_ndbapi_config_parameters();
const Uint32 max_scan_batch_size= cfg.m_scan_batch_size;
const Uint32 max_batch_byte_size= cfg.m_batch_byte_size;
const Uint32 max_batch_size= cfg.m_batch_size;
- Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
- if (record)
- {
- tot_size+= record->m_max_transid_ai_bytes;
- }
-
- const NdbRecAttr *rec_attr= first_rec_attr;
- while (rec_attr != NULL) {
- Uint32 attr_size= rec_attr->getColumn()->getSizeInBytes();
- attr_size= ((attr_size + 4 + 3) >> 2) << 2; //Even to word + overhead
- tot_size+= attr_size;
- rec_attr= rec_attr->next();
+ batch_byte_size= max_batch_byte_size;
+ if (batch_byte_size * parallelism > max_scan_batch_size) {
+ batch_byte_size= max_scan_batch_size / parallelism;
}
- tot_size+= 32; //include signal overhead
-
- /**
- * Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
- * bytes sent for each batch from each node. We do however ensure that
- * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
- * batch.
- */
- if (batch_size == 0)
- {
- batch_byte_size= max_batch_byte_size;
+ if (batch_size == 0 || batch_size > max_batch_size) {
+ batch_size= max_batch_size;
}
- else
- {
- batch_byte_size= batch_size * tot_size;
+ if (unlikely(batch_size > MAX_PARALLEL_OP_PER_SCAN)) {
+ batch_size= MAX_PARALLEL_OP_PER_SCAN;
}
-
- if (batch_byte_size * parallelism > max_scan_batch_size) {
- batch_byte_size= max_scan_batch_size / parallelism;
+ if (unlikely(batch_size > batch_byte_size)) {
+ batch_size= batch_byte_size;
}
- batch_size= batch_byte_size / tot_size;
- if (batch_size == 0) {
- batch_size= 1;
- } else {
- if (batch_size > max_batch_size) {
- batch_size= max_batch_size;
- } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
- batch_size= MAX_PARALLEL_OP_PER_SCAN;
- }
- }
- first_batch_size= batch_size;
+
return;
}
void
-NdbReceiver::calculate_batch_size(Uint32 key_size,
- Uint32 parallelism,
+NdbReceiver::calculate_batch_size(Uint32 parallelism,
Uint32& batch_size,
- Uint32& batch_byte_size,
- Uint32& first_batch_size,
- const NdbRecord *record) const
+ Uint32& batch_byte_size) const
{
calculate_batch_size(* m_ndb->theImpl,
- record,
- theFirstRecAttr,
- key_size, parallelism, batch_size, batch_byte_size,
- first_batch_size);
+ parallelism,
+ batch_size,
+ batch_byte_size);
}
void
=== modified file 'storage/ndb/src/ndbapi/NdbRecord.hpp'
--- a/storage/ndb/src/ndbapi/NdbRecord.hpp 2011-07-09 11:16:31 +0000
+++ b/storage/ndb/src/ndbapi/NdbRecord.hpp 2011-11-09 13:38:13 +0000
@@ -189,8 +189,6 @@ public:
Uint32 tableVersion;
/* Copy of table->m_keyLenInWords. */
Uint32 m_keyLenInWords;
- /* Total maximum size of TRANSID_AI data (for computing batch size). */
- Uint32 m_max_transid_ai_bytes;
/**
* Number of distribution keys (usually == number of primary keys).
*
=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2011-05-20 05:54:20 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2011-11-09 13:38:13 +0000
@@ -2284,16 +2284,13 @@ int NdbScanOperation::prepareSendScan(Ui
*/
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
Uint32 batch_size = req->first_batch_size; // User specified
- Uint32 batch_byte_size, first_batch_size;
- theReceiver.calculate_batch_size(key_size,
- theParallelism,
+ Uint32 batch_byte_size;
+ theReceiver.calculate_batch_size(theParallelism,
batch_size,
- batch_byte_size,
- first_batch_size,
- m_attribute_record);
+ batch_byte_size);
ScanTabReq::setScanBatch(req->requestInfo, batch_size);
req->batch_byte_size= batch_byte_size;
- req->first_batch_size= first_batch_size;
+ req->first_batch_size= batch_size;
/**
* Set keyinfo, nodisk and distribution key flags in
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3580 to 3581) | Ole John Aske | 11 Nov |