4331 Jonas Oreland 2011-11-10 [merge]
ndb - merge 70 to 71
added:
sql/ndb_component.cc
sql/ndb_component.h
sql/ndb_util_thread.h
modified:
libmysqld/Makefile.am
mysql-test/suite/ndb/r/ndb_index_stat.result
sql/Makefile.am
sql/ha_ndb_index_stat.cc
sql/ha_ndb_index_stat.h
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
storage/ndb/CMakeLists.txt
storage/ndb/include/kernel/signaldata/ScanFrag.hpp
storage/ndb/include/kernel/signaldata/TupKey.hpp
storage/ndb/include/ndbapi/NdbReceiver.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbReceiver.cpp
storage/ndb/src/ndbapi/NdbRecord.hpp
storage/ndb/src/ndbapi/NdbScanOperation.cpp
storage/ndb/test/ndbapi/flexAsynch.cpp
4330 jonas oreland 2011-11-08
ndb - bump version to 7.1.18
modified:
configure.in
storage/ndb/ndb_configure.m4
=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am 2011-09-08 06:22:07 +0000
+++ b/libmysqld/Makefile.am 2011-11-10 14:23:53 +0000
@@ -49,7 +49,8 @@ sqlsources = derror.cc field.cc field_co
ha_ndbcluster.cc ha_ndbcluster_cond.cc \
ha_ndbcluster_connection.cc ha_ndbinfo.cc \
ha_ndb_index_stat.cc \
- ha_ndbcluster_binlog.cc ndb_conflict_trans.cc ha_partition.cc \
+ ha_ndbcluster_binlog.cc ndb_conflict_trans.cc ndb_component.cc \
+ ha_partition.cc \
handler.cc sql_handler.cc \
hostname.cc init.cc password.c \
item.cc item_buff.cc item_cmpfunc.cc item_create.cc \
=== modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result'
--- a/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-10-08 16:56:43 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-11-09 08:27:32 +0000
@@ -21,18 +21,18 @@ Variable_name Value
ndb_index_stat_enable ON
show global variables like 'ndb_index_stat_option';
Variable_name Value
-ndb_index_stat_option loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90
+ndb_index_stat_option loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90,zero_total=0
set @save_option = @@global.ndb_index_stat_option;
set @@global.ndb_index_stat_option = 'loop_idle=3333,cache_limit=44M';
set @@global.ndb_index_stat_option = 'cache_lowpct=85,evict_delay=55';
set @@global.ndb_index_stat_option = 'check_delay=234s';
show global variables like 'ndb_index_stat_option';
Variable_name Value
-ndb_index_stat_option loop_enable=1000ms,loop_idle=3333ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=234s,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=55s,cache_limit=44M,cache_lowpct=85
+ndb_index_stat_option loop_enable=1000ms,loop_idle=3333ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=234s,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=55s,cache_limit=44M,cache_lowpct=85,zero_total=0
set @@global.ndb_index_stat_option = @save_option;
show global variables like 'ndb_index_stat_option';
Variable_name Value
-ndb_index_stat_option loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90
+ndb_index_stat_option loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90,zero_total=0
create table t1 (
a1 int unsigned not null,
b1 int unsigned not null,
=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am 2011-09-08 06:22:07 +0000
+++ b/sql/Makefile.am 2011-11-10 14:23:53 +0000
@@ -64,6 +64,8 @@ noinst_HEADERS = item.h item_func.h item
ha_ndb_index_stat.h \
ndb_mi.h \
ndb_conflict_trans.h \
+ ndb_component.h \
+ ndb_util_thread.h \
ha_partition.h rpl_constants.h \
debug_sync.h \
opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
@@ -141,7 +143,8 @@ libndb_la_SOURCES= ha_ndbcluster.cc \
ha_ndb_index_stat.cc \
ha_ndbinfo.cc \
ndb_mi.cc \
- ndb_conflict_trans.cc
+ ndb_conflict_trans.cc \
+ ndb_component.cc
gen_lex_hash_SOURCES = gen_lex_hash.cc
gen_lex_hash_LDFLAGS = @NOINST_LDFLAGS@
=== modified file 'sql/ha_ndb_index_stat.cc'
--- a/sql/ha_ndb_index_stat.cc 2011-10-20 16:18:28 +0000
+++ b/sql/ha_ndb_index_stat.cc 2011-11-10 10:35:09 +0000
@@ -24,6 +24,16 @@
#include <mysql/plugin.h>
#include <ctype.h>
+/* from other files */
+extern struct st_ndb_status g_ndb_status;
+extern pthread_mutex_t ndbcluster_mutex;
+
+/* these have to live in ha_ndbcluster.cc */
+extern bool ndb_index_stat_get_enable(THD *thd);
+extern const char* g_ndb_status_index_stat_status;
+extern long g_ndb_status_index_stat_cache_query;
+extern long g_ndb_status_index_stat_cache_clean;
+
// Do we have waiter...
static bool ndb_index_stat_waiter= false;
@@ -40,6 +50,28 @@ set_thd_ndb(THD *thd, Thd_ndb *thd_ndb)
typedef NdbDictionary::Table NDBTAB;
typedef NdbDictionary::Index NDBINDEX;
+/** ndb_index_stat_thread */
+Ndb_index_stat_thread::Ndb_index_stat_thread()
+ : running(-1)
+{
+ pthread_mutex_init(&LOCK, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&COND, NULL);
+ pthread_cond_init(&COND_ready, NULL);
+ pthread_mutex_init(&list_mutex, MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&stat_mutex, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&stat_cond, NULL);
+}
+
+Ndb_index_stat_thread::~Ndb_index_stat_thread()
+{
+ pthread_mutex_destroy(&LOCK);
+ pthread_cond_destroy(&COND);
+ pthread_cond_destroy(&COND_ready);
+ pthread_mutex_destroy(&list_mutex);
+ pthread_mutex_destroy(&stat_mutex);
+ pthread_cond_destroy(&stat_cond);
+}
+
struct Ndb_index_stat {
enum {
LT_Undef= 0,
@@ -75,6 +107,7 @@ struct Ndb_index_stat {
struct Ndb_index_stat *list_next;
struct Ndb_index_stat *list_prev;
struct NDB_SHARE *share;
+ bool to_delete; /* detached from share and marked for delete */
Ndb_index_stat();
};
@@ -134,7 +167,8 @@ struct Ndb_index_stat_opt {
Umsec = 4
};
enum Flag {
- Freadonly = (1 << 0)
+ Freadonly = (1 << 0),
+ Fcontrol = (1 << 1)
};
struct Val {
const char* name;
@@ -161,7 +195,8 @@ struct Ndb_index_stat_opt {
Ievict_delay = 13,
Icache_limit = 14,
Icache_lowpct = 15,
- Imax = 16
+ Izero_total = 16,
+ Imax = 17
};
Val val[Imax];
/* Options in string format (SYSVAR ndb_index_stat_option) */
@@ -171,6 +206,10 @@ struct Ndb_index_stat_opt {
assert(i < Imax);
return val[i].val;
}
+ void set(Idx i, uint the_val) {
+ assert(i < Imax);
+ val[i].val = the_val;
+ }
};
Ndb_index_stat_opt::Ndb_index_stat_opt(char* buf) :
@@ -197,8 +236,9 @@ Ndb_index_stat_opt::Ndb_index_stat_opt(c
ival(error_delay, 60, 0, ~0, Utime, 0);
ival(evict_batch, 8, 1, ~0, Usize, 0);
ival(evict_delay, 60, 0, ~0, Utime, 0);
- ival(cache_limit, 32*1024*1024, 1024*1024, ~0, Usize, 0);
+ ival(cache_limit, 32*1024*1024, 0, ~0, Usize, 0);
ival(cache_lowpct, 90, 0, 100, Usize, 0);
+ ival(zero_total, 0, 0, 1, Ubool, Fcontrol);
#undef ival
ndb_index_stat_opt2str(*this, option);
@@ -234,9 +274,9 @@ ndb_index_stat_opt2str(const Ndb_index_s
{
DBUG_ASSERT(v.val == 0 || v.val == 1);
if (v.val == 0)
- my_snprintf(ptr, sz, "%s%s=OFF", sep, v.name);
+ my_snprintf(ptr, sz, "%s%s=0", sep, v.name);
else
- my_snprintf(ptr, sz, "%s%s=ON", sep, v.name);
+ my_snprintf(ptr, sz, "%s%s=1", sep, v.name);
}
break;
@@ -308,12 +348,14 @@ ndb_index_stat_option_parse(char* p, Ndb
if (*r == 0)
DBUG_RETURN(-1);
+ bool found= false;
const uint imax= Ndb_index_stat_opt::Imax;
for (uint i= 0; i < imax; i++)
{
Ndb_index_stat_opt::Val& v= opt.val[i];
if (strcmp(p, v.name) != 0)
continue;
+ found= true;
char *s;
for (s= r; *s != 0; s++)
@@ -400,6 +442,9 @@ ndb_index_stat_option_parse(char* p, Ndb
break;
}
}
+
+ if (!found)
+ DBUG_RETURN(-1);
DBUG_RETURN(0);
}
@@ -503,15 +548,23 @@ struct Ndb_index_stat_glob {
uint wait_update;
uint no_stats;
uint wait_stats;
+ /* Accumulating counters */
+ uint analyze_count;
+ uint analyze_error;
+ uint query_count;
+ uint query_no_stats;
+ uint query_error;
uint event_ok; /* Events received for known index */
uint event_miss; /* Events received for unknown index */
- char status[2][512];
- uint status_i;
+ /* Cache */
uint cache_query_bytes; /* In use */
uint cache_clean_bytes; /* Obsolete versions not yet removed */
+ char status[2][512];
+ uint status_i;
Ndb_index_stat_glob();
void set_status();
+ void zero_total();
};
Ndb_index_stat_glob::Ndb_index_stat_glob()
@@ -524,12 +577,17 @@ Ndb_index_stat_glob::Ndb_index_stat_glob
wait_update= 0;
no_stats= 0;
wait_stats= 0;
+ analyze_count= 0;
+ analyze_error= 0;
+ query_count= 0;
+ query_no_stats= 0;
+ query_error= 0;
event_ok= 0;
event_miss= 0;
- memset(status, 0, sizeof(status));
- status_i= 0;
cache_query_bytes= 0;
cache_clean_bytes= 0;
+ memset(status, 0, sizeof(status));
+ status_i= 0;
}
/* Update status variable (must hold stat_mutex) */
@@ -541,7 +599,7 @@ Ndb_index_stat_glob::set_status()
// stats thread
th_allow= ndb_index_stat_allow();
- sprintf(p, "allow:%d,enable:%d,busy:%d,loop:%ums",
+ sprintf(p, "allow:%d,enable:%d,busy:%d,loop:%u",
th_allow, th_enable, th_busy, th_loop);
p+= strlen(p);
@@ -562,11 +620,19 @@ Ndb_index_stat_glob::set_status()
// special counters
sprintf(p, ",analyze:(queue:%u,wait:%u)", force_update, wait_update);
p+= strlen(p);
- sprintf(p, ",stats:(none:%u,wait:%u)", no_stats, wait_stats);
+ sprintf(p, ",stats:(nostats:%u,wait:%u)", no_stats, wait_stats);
p+= strlen(p);
- // events
- sprintf(p, ",events:(ok:%u,miss:%u)", event_ok, event_miss);
+ // accumulating counters
+ sprintf(p, ",total:(");
+ p+= strlen(p);
+ sprintf(p, "analyze:(all:%u,error:%u)", analyze_count, analyze_error);
+ p+= strlen(p);
+ sprintf(p, ",query:(all:%u,nostats:%u,error:%u)", query_count, query_no_stats, query_error);
+ p+= strlen(p);
+ sprintf(p, ",event:(ok:%u,miss:%u)", event_ok, event_miss);
+ p+= strlen(p);
+ sprintf(p, ")");
p+= strlen(p);
// cache size
@@ -575,7 +641,7 @@ Ndb_index_stat_glob::set_status()
double cache_pct= (double)0.0;
if (cache_limit != 0)
cache_pct= (double)100.0 * (double)cache_total / (double)cache_limit;
- sprintf(p, ",cache:(query:%u,clean:%u,total:%.2f%%)",
+ sprintf(p, ",cache:(query:%u,clean:%u,totalpct:%.2f)",
cache_query_bytes, cache_clean_bytes, cache_pct);
p+= strlen(p);
@@ -588,6 +654,19 @@ Ndb_index_stat_glob::set_status()
pthread_mutex_unlock(&LOCK_global_system_variables);
}
+/* Zero accumulating counters */
+void
+Ndb_index_stat_glob::zero_total()
+{
+ analyze_count= 0;
+ analyze_error= 0;
+ query_count= 0;
+ query_no_stats= 0;
+ query_error= 0;
+ event_ok= 0;
+ event_miss= 0;
+}
+
Ndb_index_stat_glob ndb_index_stat_glob;
/* Shared index entries */
@@ -616,6 +695,7 @@ Ndb_index_stat::Ndb_index_stat()
list_next= 0;
list_prev= 0;
share= 0;
+ to_delete= false;
}
void
@@ -864,8 +944,8 @@ ndb_index_stat_get_share(NDB_SHARE *shar
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
pthread_mutex_lock(&share->mutex);
- pthread_mutex_lock(&ndb_index_stat_list_mutex);
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
time_t now= ndb_index_stat_time();
err_out= 0;
@@ -902,17 +982,23 @@ ndb_index_stat_get_share(NDB_SHARE *shar
}
while (0);
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
- pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
pthread_mutex_unlock(&share->mutex);
return st;
}
+/*
+ Prepare to delete index stat entry. Remove it from per-share
+ list and set "to_delete" flag. Stats thread does real delete.
+*/
+
void
ndb_index_stat_free(Ndb_index_stat *st)
{
+ DBUG_ENTER("ndb_index_stat_free");
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
- pthread_mutex_lock(&ndb_index_stat_list_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
NDB_SHARE *share= st->share;
assert(share != 0);
@@ -924,10 +1010,13 @@ ndb_index_stat_free(Ndb_index_stat *st)
{
if (st == st_loop)
{
+ DBUG_PRINT("index_stat", ("st %s stat free one", st->id));
+ st->share_next= 0;
st->share= 0;
assert(st->lt != 0);
assert(st->lt != Ndb_index_stat::LT_Delete);
- ndb_index_stat_list_move(st, Ndb_index_stat::LT_Delete);
+ assert(!st->to_delete);
+ st->to_delete= true;
st_loop= st_loop->share_next;
assert(!found);
found++;
@@ -946,30 +1035,36 @@ ndb_index_stat_free(Ndb_index_stat *st)
assert(found);
share->index_stat_list= st_head;
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
- pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+ DBUG_VOID_RETURN;
}
void
ndb_index_stat_free(NDB_SHARE *share)
{
+ DBUG_ENTER("ndb_index_stat_free");
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
- pthread_mutex_lock(&ndb_index_stat_list_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
Ndb_index_stat *st;
while ((st= share->index_stat_list) != 0)
{
+ DBUG_PRINT("index_stat", ("st %s stat free all", st->id));
share->index_stat_list= st->share_next;
+ st->share_next= 0;
st->share= 0;
assert(st->lt != 0);
assert(st->lt != Ndb_index_stat::LT_Delete);
- ndb_index_stat_list_move(st, Ndb_index_stat::LT_Delete);
+ assert(!st->to_delete);
+ st->to_delete= true;
}
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
- pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+ DBUG_VOID_RETURN;
}
/* Find entry across shares */
@@ -979,7 +1074,7 @@ ndb_index_stat_find_entry(int index_id,
{
DBUG_ENTER("ndb_index_stat_find_entry");
pthread_mutex_lock(&ndbcluster_mutex);
- pthread_mutex_lock(&ndb_index_stat_list_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
DBUG_PRINT("index_stat", ("find index:%d version:%d table:%d",
index_id, index_version, table_id));
@@ -992,7 +1087,7 @@ ndb_index_stat_find_entry(int index_id,
if (st->index_id == index_id &&
st->index_version == index_version)
{
- pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(st);
}
@@ -1000,7 +1095,7 @@ ndb_index_stat_find_entry(int index_id,
}
}
- pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
@@ -1074,7 +1169,7 @@ void
ndb_index_stat_proc_new(Ndb_index_stat_proc &pr)
{
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
- pthread_mutex_lock(&ndb_index_stat_list_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
const int lt= Ndb_index_stat::LT_New;
Ndb_index_stat_list &list= ndb_index_stat_list[lt];
@@ -1088,10 +1183,10 @@ ndb_index_stat_proc_new(Ndb_index_stat_p
assert(pr.lt != lt);
ndb_index_stat_list_move(st, pr.lt);
}
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
- pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
}
void
@@ -1126,9 +1221,9 @@ ndb_index_stat_proc_update(Ndb_index_sta
assert(pr.lt != lt);
ndb_index_stat_list_move(st, pr.lt);
// db op so update status after each
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
cnt++;
}
if (cnt == batch)
@@ -1141,7 +1236,7 @@ ndb_index_stat_proc_read(Ndb_index_stat_
NdbIndexStat::Head head;
if (st->is->read_stat(pr.ndb) == -1)
{
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
ndb_index_stat_error(st, "read_stat", __LINE__);
const bool force_update= st->force_update;
ndb_index_stat_force_update(st, false);
@@ -1158,12 +1253,12 @@ ndb_index_stat_proc_read(Ndb_index_stat_
pr.lt= Ndb_index_stat::LT_Error;
}
- pthread_cond_broadcast(&ndb_index_stat_stat_cond);
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_cond_broadcast(&ndb_index_stat_thread.stat_cond);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
return;
}
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
pr.now= ndb_index_stat_time();
st->is->get_head(head);
st->load_time= head.m_loadTime;
@@ -1176,8 +1271,8 @@ ndb_index_stat_proc_read(Ndb_index_stat_
ndb_index_stat_cache_move(st);
st->cache_clean= false;
pr.lt= Ndb_index_stat::LT_Idle;
- pthread_cond_broadcast(&ndb_index_stat_stat_cond);
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_cond_broadcast(&ndb_index_stat_thread.stat_cond);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
}
void
@@ -1200,9 +1295,9 @@ ndb_index_stat_proc_read(Ndb_index_stat_
assert(pr.lt != lt);
ndb_index_stat_list_move(st, pr.lt);
// db op so update status after each
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
cnt++;
}
if (cnt == batch)
@@ -1221,9 +1316,15 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
st->check_time == 0 ? 0 : st->check_time + check_delay - pr.now;
DBUG_PRINT("index_stat", ("st %s check wait:%lds force update:%u"
- " clean wait:%lds cache clean:%d",
+ " clean wait:%lds cache clean:%d to delete:%d",
st->id, (long)check_wait, st->force_update,
- (long)clean_wait, st->cache_clean));
+ (long)clean_wait, st->cache_clean, st->to_delete));
+
+ if (st->to_delete)
+ {
+ pr.lt= Ndb_index_stat::LT_Delete;
+ return;
+ }
if (!st->cache_clean && clean_wait <= 0)
{
@@ -1260,7 +1361,7 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
uint batch= opt.get(Ndb_index_stat_opt::Iidle_batch);
{
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
const Ndb_index_stat_glob &glob= ndb_index_stat_glob;
const int lt_update= Ndb_index_stat::LT_Update;
const Ndb_index_stat_list &list_update= ndb_index_stat_list[lt_update];
@@ -1269,7 +1370,7 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
// probably there is a force update waiting on Idle list
batch= ~0;
}
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
}
// entry may be moved to end of this list
if (batch > list.count)
@@ -1289,9 +1390,9 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
cnt++;
}
// full batch does not set pr.busy
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
}
void
@@ -1348,9 +1449,9 @@ ndb_index_stat_proc_check(Ndb_index_stat
assert(pr.lt != lt);
ndb_index_stat_list_move(st, pr.lt);
// db op so update status after each
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
cnt++;
}
if (cnt == batch)
@@ -1384,9 +1485,9 @@ ndb_index_stat_proc_evict(Ndb_index_stat
ndb_index_stat_cache_move(st);
ndb_index_stat_cache_clean(st);
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
}
bool
@@ -1493,6 +1594,13 @@ ndb_index_stat_proc_delete(Ndb_index_sta
Ndb_index_stat *st= st_loop;
st_loop= st_loop->list_next;
DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
+
+ // adjust global counters at drop
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+ ndb_index_stat_force_update(st, false);
+ ndb_index_stat_no_stats(st, false);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+
ndb_index_stat_proc_evict(pr, st);
ndb_index_stat_list_remove(st);
delete st->is;
@@ -1502,9 +1610,9 @@ ndb_index_stat_proc_delete(Ndb_index_sta
if (cnt == batch)
pr.busy= true;
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
}
void
@@ -1514,6 +1622,12 @@ ndb_index_stat_proc_error(Ndb_index_stat
const int error_delay= opt.get(Ndb_index_stat_opt::Ierror_delay);
const time_t error_wait= st->error_time + error_delay - pr.now;
+ if (st->to_delete)
+ {
+ pr.lt= Ndb_index_stat::LT_Delete;
+ return;
+ }
+
if (error_wait <= 0 ||
/* Analyze issued after previous error */
st->force_update)
@@ -1558,9 +1672,9 @@ ndb_index_stat_proc_error(Ndb_index_stat
cnt++;
}
// full batch does not set pr.busy
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
}
void
@@ -1633,13 +1747,67 @@ ndb_index_stat_proc_event(Ndb_index_stat
glob.event_miss++;
}
}
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+}
+
+/* Control options */
+
+void
+ndb_index_stat_proc_control(Ndb_index_stat_proc &pr)
+{
+ Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+ Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+
+ /* Request to zero accumulating counters */
+ if (opt.get(Ndb_index_stat_opt::Izero_total) == true)
+ {
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+ glob.zero_total();
+ glob.set_status();
+ opt.set(Ndb_index_stat_opt::Izero_total, false);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+ }
}
#ifndef DBUG_OFF
void
+ndb_index_stat_entry_verify(const Ndb_index_stat *st)
+{
+ const NDB_SHARE *share= st->share;
+ if (st->to_delete)
+ {
+ assert(st->share_next == 0);
+ assert(share == 0);
+ }
+ else
+ {
+ assert(share != 0);
+ const Ndb_index_stat *st2= share->index_stat_list;
+ assert(st2 != 0);
+ uint found= 0;
+ while (st2 != 0)
+ {
+ assert(st2->share == share);
+ const Ndb_index_stat *st3= st2->share_next;
+ uint guard= 0;
+ while (st3 != 0)
+ {
+ assert(st2 != st3);
+ guard++;
+ assert(guard <= 1000); // MAX_INDEXES
+ st3= st3->share_next;
+ }
+ if (st == st2)
+ found++;
+ st2= st2->share_next;
+ }
+ assert(found == 1);
+ }
+}
+
+void
ndb_index_stat_list_verify(int lt)
{
const Ndb_index_stat_list &list= ndb_index_stat_list[lt];
@@ -1684,6 +1852,7 @@ ndb_index_stat_list_verify(int lt)
assert(guard <= list.count);
st2= st2->list_next;
}
+ ndb_index_stat_entry_verify(st);
st= st->list_next;
}
assert(count == list.count);
@@ -1692,10 +1861,10 @@ ndb_index_stat_list_verify(int lt)
void
ndb_index_stat_list_verify()
{
- pthread_mutex_lock(&ndb_index_stat_list_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
for (int lt= 1; lt < Ndb_index_stat::LT_Count; lt++)
ndb_index_stat_list_verify(lt);
- pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
}
void
@@ -1717,6 +1886,9 @@ void
ndb_index_stat_proc(Ndb_index_stat_proc &pr)
{
DBUG_ENTER("ndb_index_stat_proc");
+
+ ndb_index_stat_proc_control(pr);
+
#ifndef DBUG_OFF
ndb_index_stat_list_verify();
Ndb_index_stat_glob old_glob= ndb_index_stat_glob;
@@ -1751,9 +1923,9 @@ ndb_index_stat_end()
* in LT_Delete. The first two steps here should be unnecessary.
*/
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
ndb_index_stat_allow(0);
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
int lt;
for (lt= 1; lt < Ndb_index_stat::LT_Count; lt++)
@@ -1889,14 +2061,13 @@ ndb_index_stat_stop_listener(Ndb_index_s
DBUG_RETURN(0);
}
-pthread_handler_t
-ndb_index_stat_thread_func(void *arg __attribute__((unused)))
+void
+Ndb_index_stat_thread::do_run()
{
THD *thd; /* needs to be first for thread_stack */
struct timespec abstime;
Thd_ndb *thd_ndb= NULL;
- my_thread_init();
DBUG_ENTER("ndb_index_stat_thread_func");
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
@@ -1906,18 +2077,16 @@ ndb_index_stat_thread_func(void *arg __a
have_listener= false;
// wl4124_todo remove useless stuff copied from utility thread
-
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
thd= new THD; /* note that contructor of THD uses DBUG_ */
if (thd == NULL)
{
my_errno= HA_ERR_OUT_OF_MEM;
- DBUG_RETURN(NULL);
+ DBUG_VOID_RETURN;
}
THD_CHECK_SENTRY(thd);
- pthread_detach_this_thread();
- ndb_index_stat_thread= pthread_self();
thd->thread_stack= (char*)&thd; /* remember where our stack is */
if (thd->store_globals())
@@ -1940,9 +2109,9 @@ ndb_index_stat_thread_func(void *arg __a
thd->update_charset();
/* Signal successful initialization */
- ndb_index_stat_thread_running= 1;
- pthread_cond_signal(&COND_ndb_index_stat_ready);
- pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+ ndb_index_stat_thread.running= 1;
+ pthread_cond_signal(&ndb_index_stat_thread.COND_ready);
+ pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
/*
wait for mysql server to start
@@ -1956,7 +2125,7 @@ ndb_index_stat_thread_func(void *arg __a
if (ndbcluster_terminating)
{
mysql_mutex_unlock(&LOCK_server_started);
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
goto ndb_index_stat_thread_end;
}
}
@@ -1965,21 +2134,21 @@ ndb_index_stat_thread_func(void *arg __a
/*
Wait for cluster to start
*/
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
while (!g_ndb_status.cluster_node_id && (ndbcluster_hton->slot != ~(uint)0))
{
/* ndb not connected yet */
- pthread_cond_wait(&COND_ndb_index_stat_thread, &LOCK_ndb_index_stat_thread);
+ pthread_cond_wait(&ndb_index_stat_thread.COND, &ndb_index_stat_thread.LOCK);
if (ndbcluster_terminating)
goto ndb_index_stat_thread_end;
}
- pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
/* Get instance used for sys objects check and create */
if (!(pr.is_util= new NdbIndexStat))
{
sql_print_error("Could not allocate NdbIndexStat is_util object");
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
goto ndb_index_stat_thread_end;
}
@@ -1987,7 +2156,7 @@ ndb_index_stat_thread_func(void *arg __a
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb(thd)))
{
sql_print_error("Could not allocate Thd_ndb object");
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
goto ndb_index_stat_thread_end;
}
set_thd_ndb(thd, thd_ndb);
@@ -1996,19 +2165,19 @@ ndb_index_stat_thread_func(void *arg __a
{
sql_print_error("Could not change index stats thd_ndb database to %s",
NDB_INDEX_STAT_DB);
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
goto ndb_index_stat_thread_end;
}
pr.ndb= thd_ndb->ndb;
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
ndb_index_stat_allow(1);
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
/* Fill in initial status variable */
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
bool enable_ok;
enable_ok= false;
@@ -2016,10 +2185,10 @@ ndb_index_stat_thread_func(void *arg __a
set_timespec(abstime, 0);
for (;;)
{
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
if (!ndbcluster_terminating && ndb_index_stat_waiter == false) {
- int ret= pthread_cond_timedwait(&COND_ndb_index_stat_thread,
- &LOCK_ndb_index_stat_thread,
+ int ret= pthread_cond_timedwait(&ndb_index_stat_thread.COND,
+ &ndb_index_stat_thread.LOCK,
&abstime);
const char* reason= ret == ETIMEDOUT ? "timed out" : "wake up";
(void)reason; // USED
@@ -2028,7 +2197,7 @@ ndb_index_stat_thread_func(void *arg __a
if (ndbcluster_terminating) /* Shutting down server */
goto ndb_index_stat_thread_end;
ndb_index_stat_waiter= false;
- pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
/* const bool enable_ok_new= THDVAR(NULL, index_stat_enable); */
const bool enable_ok_new= ndb_index_stat_get_enable(NULL);
@@ -2089,9 +2258,9 @@ ndb_index_stat_thread_func(void *arg __a
glob.th_enable= enable_ok;
glob.th_busy= pr.busy;
glob.th_loop= msecs;
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
}
ndb_index_stat_thread_end:
@@ -2117,15 +2286,12 @@ ndb_index_stat_thread_fail:
delete thd;
/* signal termination */
- ndb_index_stat_thread_running= 0;
- pthread_cond_signal(&COND_ndb_index_stat_ready);
- pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+ ndb_index_stat_thread.running= 0;
+ pthread_cond_signal(&ndb_index_stat_thread.COND_ready);
+ pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
DBUG_PRINT("exit", ("ndb_index_stat_thread"));
DBUG_LEAVE;
- my_thread_end();
- pthread_exit(0);
- return NULL;
}
/* Optimizer queries */
@@ -2151,7 +2317,7 @@ ndb_index_stat_wait(Ndb_index_stat *st,
DBUG_ENTER("ndb_index_stat_wait");
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
- pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
int err= 0;
uint count= 0;
struct timespec abstime;
@@ -2161,9 +2327,15 @@ ndb_index_stat_wait(Ndb_index_stat *st,
if (count == 0)
{
if (!from_analyze)
+ {
glob.wait_stats++;
+ glob.query_count++;
+ }
else
+ {
glob.wait_update++;
+ glob.analyze_count++;
+ }
if (st->lt == Ndb_index_stat::LT_Error && !from_analyze)
{
err= Ndb_index_stat_error_HAS_ERROR;
@@ -2175,25 +2347,30 @@ ndb_index_stat_wait(Ndb_index_stat *st,
{
/* Have detected no stats now or before */
err= NdbIndexStat::NoIndexStats;
+ glob.query_no_stats++;
break;
}
if (st->error.code != 0)
{
/* A new error has occured */
err= st->error.code;
+ if (!from_analyze)
+ glob.query_error++;
+ else
+ glob.analyze_error++;
break;
}
if (st->sample_version > sample_version)
break;
DBUG_PRINT("index_stat", ("st %s wait count:%u",
st->id, ++count));
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
ndb_index_stat_waiter= true;
- pthread_cond_signal(&COND_ndb_index_stat_thread);
- pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+ pthread_cond_signal(&ndb_index_stat_thread.COND);
+ pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
set_timespec(abstime, 1);
- ret= pthread_cond_timedwait(&ndb_index_stat_stat_cond,
- &ndb_index_stat_stat_mutex,
+ ret= pthread_cond_timedwait(&ndb_index_stat_thread.stat_cond,
+ &ndb_index_stat_thread.stat_mutex,
&abstime);
if (ret != 0 && ret != ETIMEDOUT)
{
@@ -2211,7 +2388,7 @@ ndb_index_stat_wait(Ndb_index_stat *st,
assert(glob.wait_update != 0);
glob.wait_update--;
}
- pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
if (err != 0)
{
DBUG_PRINT("index_stat", ("st %s wait error: %d",
@@ -2257,9 +2434,9 @@ ha_ndbcluster::ndb_index_stat_query(uint
if (st->read_time == 0)
{
DBUG_PRINT("index_stat", ("no index stats"));
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
- pthread_cond_signal(&COND_ndb_index_stat_thread);
- pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
+ pthread_cond_signal(&ndb_index_stat_thread.COND);
+ pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
DBUG_RETURN(NdbIndexStat::NoIndexStats);
}
=== modified file 'sql/ha_ndb_index_stat.h'
--- a/sql/ha_ndb_index_stat.h 2011-10-08 16:54:19 +0000
+++ b/sql/ha_ndb_index_stat.h 2011-11-10 10:35:09 +0000
@@ -15,22 +15,43 @@
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
-/* provides declarations only to index_stat.cc */
+#ifndef HA_NDB_INDEX_STAT_H
+#define HA_NDB_INDEX_STAT_H
-extern struct st_ndb_status g_ndb_status;
+#include "ndb_component.h"
-extern pthread_mutex_t ndbcluster_mutex;
+/* for NdbIndexScanOperation::IndexBound */
+#include <ndbapi/NdbIndexScanOperation.hpp>
-extern pthread_t ndb_index_stat_thread;
-extern pthread_cond_t COND_ndb_index_stat_thread;
-extern pthread_mutex_t LOCK_ndb_index_stat_thread;
-
-/* protect entry lists where needed */
-extern pthread_mutex_t ndb_index_stat_list_mutex;
-
-/* protect and signal changes in stats entries */
-extern pthread_mutex_t ndb_index_stat_stat_mutex;
-extern pthread_cond_t ndb_index_stat_stat_cond;
+/* forward declarations */
+struct st_key_range;
+typedef struct st_key_range key_range;
+struct st_key;
+typedef struct st_key KEY;
+
+class Ndb_index_stat_thread : public Ndb_component
+{
+public:
+ Ndb_index_stat_thread();
+ virtual ~Ndb_index_stat_thread();
+
+ int running;
+ pthread_mutex_t LOCK;
+ pthread_cond_t COND;
+ pthread_cond_t COND_ready;
+
+ /* protect entry lists where needed */
+ pthread_mutex_t list_mutex;
+
+ /* protect and signal changes in stats entries */
+ pthread_mutex_t stat_mutex;
+ pthread_cond_t stat_cond;
+
+private:
+ virtual int do_init() { return 0;}
+ virtual void do_run();
+ virtual int do_deinit() { return 0;}
+};
/* these have to live in ha_ndbcluster.cc */
extern bool ndb_index_stat_get_enable(THD *thd);
@@ -38,7 +59,7 @@ extern const char* g_ndb_status_index_st
extern long g_ndb_status_index_stat_cache_query;
extern long g_ndb_status_index_stat_cache_clean;
-void
+void
compute_index_bounds(NdbIndexScanOperation::IndexBound & bound,
const KEY *key_info,
const key_range *start_key, const key_range *end_key,
@@ -54,3 +75,5 @@ compute_index_bounds(NdbIndexScanOperati
/* request on stats entry with recent error was ignored */
#define Ndb_index_stat_error_HAS_ERROR 9003
+
+#endif
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2011-10-23 07:38:10 +0000
+++ b/sql/ha_ndbcluster.cc 2011-11-10 14:23:53 +0000
@@ -46,6 +46,8 @@
#include <ndb_version.h>
#include "ndb_mi.h"
#include "ndb_conflict_trans.h"
+#include "ndb_component.h"
+#include "ndb_util_thread.h"
#ifdef ndb_dynamite
#undef assert
@@ -420,25 +422,7 @@ static int ndb_get_table_statistics(THD
THD *injector_thd= 0;
-// Util thread variables
-pthread_t ndb_util_thread;
-int ndb_util_thread_running= 0;
-pthread_mutex_t LOCK_ndb_util_thread;
-pthread_cond_t COND_ndb_util_thread;
-pthread_cond_t COND_ndb_util_ready;
-pthread_handler_t ndb_util_thread_func(void *arg);
-
// Index stats thread variables
-pthread_t ndb_index_stat_thread;
-int ndb_index_stat_thread_running= 0;
-pthread_mutex_t LOCK_ndb_index_stat_thread;
-pthread_cond_t COND_ndb_index_stat_thread;
-pthread_cond_t COND_ndb_index_stat_ready;
-pthread_mutex_t ndb_index_stat_list_mutex;
-pthread_mutex_t ndb_index_stat_stat_mutex;
-pthread_cond_t ndb_index_stat_stat_cond;
-pthread_handler_t ndb_index_stat_thread_func(void *arg);
-
extern void ndb_index_stat_free(NDB_SHARE *share);
extern void ndb_index_stat_end();
@@ -12251,7 +12235,7 @@ ndbcluster_find_files(handlerton *hton,
/* Call back after cluster connect */
static int connect_callback()
{
- pthread_mutex_lock(&LOCK_ndb_util_thread);
+ pthread_mutex_lock(&ndb_util_thread.LOCK);
update_status_variables(NULL, &g_ndb_status,
g_ndb_cluster_connection);
@@ -12261,8 +12245,8 @@ static int connect_callback()
while ((node_id= g_ndb_cluster_connection->get_next_node(node_iter)))
g_node_id_map[node_id]= i++;
- pthread_cond_signal(&COND_ndb_util_thread);
- pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ pthread_cond_signal(&ndb_util_thread.COND);
+ pthread_mutex_unlock(&ndb_util_thread.LOCK);
return 0;
}
@@ -12308,6 +12292,12 @@ int(*ndb_wait_setup_func)(ulong) = 0;
#endif
extern int ndb_dictionary_is_mysqld;
+/**
+ * Components
+ */
+Ndb_util_thread ndb_util_thread;
+Ndb_index_stat_thread ndb_index_stat_thread;
+
static int ndbcluster_init(void *p)
{
DBUG_ENTER("ndbcluster_init");
@@ -12320,20 +12310,11 @@ static int ndbcluster_init(void *p)
assert(DependencyTracker::InvalidTransactionId ==
Ndb_binlog_extra_row_info::InvalidTransactionId);
#endif
+ ndb_util_thread.init();
+ ndb_index_stat_thread.init();
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
- pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&COND_ndb_util_thread, NULL);
- pthread_cond_init(&COND_ndb_util_ready, NULL);
pthread_cond_init(&COND_ndb_setup_complete, NULL);
- ndb_util_thread_running= -1;
- pthread_mutex_init(&LOCK_ndb_index_stat_thread, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&COND_ndb_index_stat_thread, NULL);
- pthread_cond_init(&COND_ndb_index_stat_ready, NULL);
- pthread_mutex_init(&ndb_index_stat_list_mutex, MY_MUTEX_INIT_FAST);
- pthread_mutex_init(&ndb_index_stat_stat_mutex, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&ndb_index_stat_stat_cond, NULL);
- ndb_index_stat_thread_running= -1;
ndbcluster_terminating= 0;
ndb_dictionary_is_mysqld= 1;
ndb_setup_complete= 0;
@@ -12397,72 +12378,53 @@ static int ndbcluster_init(void *p)
}
// Create utility thread
- pthread_t tmp;
- if (pthread_create(&tmp, &connection_attrib, ndb_util_thread_func, 0))
+ if (ndb_util_thread.start())
{
DBUG_PRINT("error", ("Could not create ndb utility thread"));
my_hash_free(&ndbcluster_open_tables);
pthread_mutex_destroy(&ndbcluster_mutex);
- pthread_mutex_destroy(&LOCK_ndb_util_thread);
- pthread_cond_destroy(&COND_ndb_util_thread);
- pthread_cond_destroy(&COND_ndb_util_ready);
pthread_cond_destroy(&COND_ndb_setup_complete);
ndbcluster_global_schema_lock_deinit();
goto ndbcluster_init_error;
}
/* Wait for the util thread to start */
- pthread_mutex_lock(&LOCK_ndb_util_thread);
- while (ndb_util_thread_running < 0)
- pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
- pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ pthread_mutex_lock(&ndb_util_thread.LOCK);
+ while (ndb_util_thread.running < 0)
+ pthread_cond_wait(&ndb_util_thread.COND_ready, &ndb_util_thread.LOCK);
+ pthread_mutex_unlock(&ndb_util_thread.LOCK);
- if (!ndb_util_thread_running)
+ if (!ndb_util_thread.running)
{
DBUG_PRINT("error", ("ndb utility thread exited prematurely"));
my_hash_free(&ndbcluster_open_tables);
pthread_mutex_destroy(&ndbcluster_mutex);
- pthread_mutex_destroy(&LOCK_ndb_util_thread);
- pthread_cond_destroy(&COND_ndb_util_thread);
- pthread_cond_destroy(&COND_ndb_util_ready);
pthread_cond_destroy(&COND_ndb_setup_complete);
ndbcluster_global_schema_lock_deinit();
goto ndbcluster_init_error;
}
// Create index statistics thread
- pthread_t tmp2;
- if (pthread_create(&tmp2, &connection_attrib, ndb_index_stat_thread_func, 0))
+ if (ndb_index_stat_thread.start())
{
DBUG_PRINT("error", ("Could not create ndb index statistics thread"));
my_hash_free(&ndbcluster_open_tables);
pthread_mutex_destroy(&ndbcluster_mutex);
- pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
- pthread_cond_destroy(&COND_ndb_index_stat_thread);
- pthread_cond_destroy(&COND_ndb_index_stat_ready);
- pthread_mutex_destroy(&ndb_index_stat_list_mutex);
- pthread_mutex_destroy(&ndb_index_stat_stat_mutex);
- pthread_cond_destroy(&ndb_index_stat_stat_cond);
goto ndbcluster_init_error;
}
/* Wait for the index statistics thread to start */
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
- while (ndb_index_stat_thread_running < 0)
- pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
- pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
-
- if (!ndb_index_stat_thread_running)
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
+ while (ndb_index_stat_thread.running < 0)
+ pthread_cond_wait(&ndb_index_stat_thread.COND_ready,
+ &ndb_index_stat_thread.LOCK);
+ pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
+
+ if (!ndb_index_stat_thread.running)
{
DBUG_PRINT("error", ("ndb index statistics thread exited prematurely"));
my_hash_free(&ndbcluster_open_tables);
pthread_mutex_destroy(&ndbcluster_mutex);
- pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
- pthread_cond_destroy(&COND_ndb_index_stat_thread);
- pthread_cond_destroy(&COND_ndb_index_stat_ready);
- pthread_mutex_destroy(&ndb_index_stat_list_mutex);
- pthread_mutex_destroy(&ndb_index_stat_stat_mutex);
- pthread_cond_destroy(&ndb_index_stat_stat_cond);
goto ndbcluster_init_error;
}
@@ -12476,6 +12438,8 @@ static int ndbcluster_init(void *p)
DBUG_RETURN(FALSE);
ndbcluster_init_error:
+ ndb_util_thread.deinit();
+ ndb_index_stat_thread.deinit();
/* disconnect from cluster and free connection resources */
ndbcluster_disconnect();
ndbcluster_hton->state= SHOW_OPTION_DISABLED; // If we couldn't use handler
@@ -12513,12 +12477,13 @@ static int ndbcluster_end(handlerton *ht
/* wait for index stat thread to finish */
sql_print_information("Stopping Cluster Index Statistics thread");
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
ndbcluster_terminating= 1;
- pthread_cond_signal(&COND_ndb_index_stat_thread);
- while (ndb_index_stat_thread_running > 0)
- pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
- pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+ pthread_cond_signal(&ndb_index_stat_thread.COND);
+ while (ndb_index_stat_thread.running > 0)
+ pthread_cond_wait(&ndb_index_stat_thread.COND_ready,
+ &ndb_index_stat_thread.LOCK);
+ pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
/* wait for util and binlog thread to finish */
ndbcluster_binlog_end(NULL);
@@ -12547,17 +12512,13 @@ static int ndbcluster_end(handlerton *ht
ndb_index_stat_end();
ndbcluster_disconnect();
+ ndb_util_thread.deinit();
+
// cleanup ndb interface
ndb_end_internal();
pthread_mutex_destroy(&ndbcluster_mutex);
- pthread_mutex_destroy(&LOCK_ndb_util_thread);
- pthread_cond_destroy(&COND_ndb_util_thread);
- pthread_cond_destroy(&COND_ndb_util_ready);
pthread_cond_destroy(&COND_ndb_setup_complete);
- pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
- pthread_cond_destroy(&COND_ndb_index_stat_thread);
- pthread_cond_destroy(&COND_ndb_index_stat_ready);
ndbcluster_global_schema_lock_deinit();
DBUG_RETURN(0);
}
@@ -14776,7 +14737,24 @@ ha_ndbcluster::update_table_comment(
/**
Utility thread main loop.
*/
-pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
+Ndb_util_thread::Ndb_util_thread()
+ : running(-1)
+{
+ pthread_mutex_init(&LOCK, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&COND, NULL);
+ pthread_cond_init(&COND_ready, NULL);
+}
+
+Ndb_util_thread::~Ndb_util_thread()
+{
+ assert(running <= 0);
+ pthread_mutex_destroy(&LOCK);
+ pthread_cond_destroy(&COND);
+ pthread_cond_destroy(&COND_ready);
+}
+
+void
+Ndb_util_thread::do_run()
{
THD *thd; /* needs to be first for thread_stack */
struct timespec abstime;
@@ -14784,21 +14762,18 @@ pthread_handler_t ndb_util_thread_func(v
uint share_list_size= 0;
NDB_SHARE **share_list= NULL;
- my_thread_init();
DBUG_ENTER("ndb_util_thread");
DBUG_PRINT("enter", ("cache_check_time: %lu", opt_ndb_cache_check_time));
-
- pthread_mutex_lock(&LOCK_ndb_util_thread);
+
+ pthread_mutex_lock(&ndb_util_thread.LOCK);
thd= new THD; /* note that contructor of THD uses DBUG_ */
if (thd == NULL)
{
my_errno= HA_ERR_OUT_OF_MEM;
- DBUG_RETURN(NULL);
+ DBUG_VOID_RETURN;
}
THD_CHECK_SENTRY(thd);
- pthread_detach_this_thread();
- ndb_util_thread= pthread_self();
thd->thread_stack= (char*)&thd; /* remember where our stack is */
if (thd->store_globals())
@@ -14821,9 +14796,9 @@ pthread_handler_t ndb_util_thread_func(v
thd->update_charset();
/* Signal successful initialization */
- ndb_util_thread_running= 1;
- pthread_cond_signal(&COND_ndb_util_ready);
- pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ ndb_util_thread.running= 1;
+ pthread_cond_signal(&ndb_util_thread.COND_ready);
+ pthread_mutex_unlock(&ndb_util_thread.LOCK);
/*
wait for mysql server to start
@@ -14837,7 +14812,7 @@ pthread_handler_t ndb_util_thread_func(v
if (ndbcluster_terminating)
{
mysql_mutex_unlock(&LOCK_server_started);
- pthread_mutex_lock(&LOCK_ndb_util_thread);
+ pthread_mutex_lock(&ndb_util_thread.LOCK);
goto ndb_util_thread_end;
}
}
@@ -14846,21 +14821,21 @@ pthread_handler_t ndb_util_thread_func(v
/*
Wait for cluster to start
*/
- pthread_mutex_lock(&LOCK_ndb_util_thread);
+ pthread_mutex_lock(&ndb_util_thread.LOCK);
while (!g_ndb_status.cluster_node_id && (ndbcluster_hton->slot != ~(uint)0))
{
/* ndb not connected yet */
- pthread_cond_wait(&COND_ndb_util_thread, &LOCK_ndb_util_thread);
+ pthread_cond_wait(&ndb_util_thread.COND, &ndb_util_thread.LOCK);
if (ndbcluster_terminating)
goto ndb_util_thread_end;
}
- pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ pthread_mutex_unlock(&ndb_util_thread.LOCK);
/* Get thd_ndb for this thread */
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb(thd)))
{
sql_print_error("Could not allocate Thd_ndb object");
- pthread_mutex_lock(&LOCK_ndb_util_thread);
+ pthread_mutex_lock(&ndb_util_thread.LOCK);
goto ndb_util_thread_end;
}
set_thd_ndb(thd, thd_ndb);
@@ -14872,14 +14847,14 @@ pthread_handler_t ndb_util_thread_func(v
set_timespec(abstime, 0);
for (;;)
{
- pthread_mutex_lock(&LOCK_ndb_util_thread);
+ pthread_mutex_lock(&ndb_util_thread.LOCK);
if (!ndbcluster_terminating)
- pthread_cond_timedwait(&COND_ndb_util_thread,
- &LOCK_ndb_util_thread,
+ pthread_cond_timedwait(&ndb_util_thread.COND,
+ &ndb_util_thread.LOCK,
&abstime);
if (ndbcluster_terminating) /* Shutting down server */
goto ndb_util_thread_end;
- pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ pthread_mutex_unlock(&ndb_util_thread.LOCK);
#ifdef NDB_EXTRA_DEBUG_UTIL_THREAD
DBUG_PRINT("ndb_util_thread", ("Started, cache_check_time: %lu",
opt_ndb_cache_check_time));
@@ -15032,7 +15007,7 @@ next:
set_timespec_nsec(abstime, opt_ndb_cache_check_time * 1000000ULL);
}
- pthread_mutex_lock(&LOCK_ndb_util_thread);
+ pthread_mutex_lock(&ndb_util_thread.LOCK);
ndb_util_thread_end:
net_end(&thd->net);
@@ -15048,15 +15023,12 @@ ndb_util_thread_fail:
delete thd;
/* signal termination */
- ndb_util_thread_running= 0;
- pthread_cond_signal(&COND_ndb_util_ready);
- pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ ndb_util_thread.running= 0;
+ pthread_cond_signal(&ndb_util_thread.COND_ready);
+ pthread_mutex_unlock(&ndb_util_thread.LOCK);
DBUG_PRINT("exit", ("ndb_util_thread"));
DBUG_LEAVE; // Must match DBUG_ENTER()
- my_thread_end();
- pthread_exit(0);
- return NULL; // Avoid compiler warnings
}
/*
=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h 2011-10-17 16:46:12 +0000
+++ b/sql/ha_ndbcluster.h 2011-11-10 14:23:53 +0000
@@ -1090,7 +1090,9 @@ void ndbcluster_print_error(int error, c
static const char ndbcluster_hton_name[]= "ndbcluster";
static const int ndbcluster_hton_name_length=sizeof(ndbcluster_hton_name)-1;
extern int ndbcluster_terminating;
-extern int ndb_util_thread_running;
-extern pthread_cond_t COND_ndb_util_ready;
-extern int ndb_index_stat_thread_running;
-extern pthread_cond_t COND_ndb_index_stat_ready;
+
+#include "ndb_util_thread.h"
+extern Ndb_util_thread ndb_util_thread;
+
+#include "ha_ndb_index_stat.h"
+extern Ndb_index_stat_thread ndb_index_stat_thread;
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2011-10-20 19:18:20 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2011-11-10 14:23:53 +0000
@@ -812,7 +812,7 @@ int ndbcluster_binlog_end(THD *thd)
{
DBUG_ENTER("ndbcluster_binlog_end");
- if (ndb_util_thread_running > 0)
+ if (ndb_util_thread.running > 0)
{
/*
Wait for util thread to die (as this uses the injector mutex)
@@ -822,33 +822,34 @@ int ndbcluster_binlog_end(THD *thd)
be called before ndb_cluster_end().
*/
sql_print_information("Stopping Cluster Utility thread");
- pthread_mutex_lock(&LOCK_ndb_util_thread);
+ pthread_mutex_lock(&ndb_util_thread.LOCK);
/* Ensure mutex are not freed if ndb_cluster_end is running at same time */
- ndb_util_thread_running++;
+ ndb_util_thread.running++;
ndbcluster_terminating= 1;
- pthread_cond_signal(&COND_ndb_util_thread);
- while (ndb_util_thread_running > 1)
- pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
- ndb_util_thread_running--;
- pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ pthread_cond_signal(&ndb_util_thread.COND);
+ while (ndb_util_thread.running > 1)
+ pthread_cond_wait(&ndb_util_thread.COND_ready, &ndb_util_thread.LOCK);
+ ndb_util_thread.running--;
+ pthread_mutex_unlock(&ndb_util_thread.LOCK);
}
- if (ndb_index_stat_thread_running > 0)
+ if (ndb_index_stat_thread.running > 0)
{
/*
Index stats thread blindly imitates util thread. Following actually
fixes some "[Warning] Plugin 'ndbcluster' will be forced to shutdown".
*/
sql_print_information("Stopping Cluster Index Stats thread");
- pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
/* Ensure mutex are not freed if ndb_cluster_end is running at same time */
- ndb_index_stat_thread_running++;
+ ndb_index_stat_thread.running++;
ndbcluster_terminating= 1;
- pthread_cond_signal(&COND_ndb_index_stat_thread);
- while (ndb_index_stat_thread_running > 1)
- pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
- ndb_index_stat_thread_running--;
- pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+ pthread_cond_signal(&ndb_index_stat_thread.COND);
+ while (ndb_index_stat_thread.running > 1)
+ pthread_cond_wait(&ndb_index_stat_thread.COND_ready,
+ &ndb_index_stat_thread.LOCK);
+ ndb_index_stat_thread.running--;
+ pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
}
if (ndbcluster_binlog_inited)
=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h 2011-10-20 19:18:20 +0000
+++ b/sql/ha_ndbcluster_binlog.h 2011-11-10 14:23:53 +0000
@@ -191,10 +191,6 @@ void ndbcluster_global_schema_lock_init(
void ndbcluster_global_schema_lock_deinit();
extern unsigned char g_node_id_map[max_ndb_nodes];
-extern pthread_mutex_t LOCK_ndb_util_thread;
-extern pthread_cond_t COND_ndb_util_thread;
-extern pthread_mutex_t LOCK_ndb_index_stat_thread;
-extern pthread_cond_t COND_ndb_index_stat_thread;
extern pthread_mutex_t ndbcluster_mutex;
extern HASH ndbcluster_open_tables;
=== added file 'sql/ndb_component.cc'
--- a/sql/ndb_component.cc 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_component.cc 2011-11-10 08:16:52 +0000
@@ -0,0 +1,143 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "ndb_component.h"
+
+Ndb_component::Ndb_component()
+ : m_thread_state(TS_UNINIT)
+{
+}
+
+Ndb_component::~Ndb_component()
+{
+
+}
+
+int
+Ndb_component::init()
+{
+ assert(m_thread_state == TS_UNINIT);
+
+ pthread_mutex_init(&m_start_stop_mutex, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&m_start_stop_cond, NULL);
+
+ int res= do_init();
+ if (res == 0)
+ {
+ m_thread_state= TS_INIT;
+ }
+ return res;
+}
+
+void *
+Ndb_component_run_C(void * arg)
+{
+ my_thread_init();
+ Ndb_component * self = reinterpret_cast<Ndb_component*>(arg);
+ self->run_impl();
+ my_thread_end();
+ pthread_exit(0);
+ return NULL; // Avoid compiler warnings
+}
+
+extern pthread_attr_t connection_attrib; // mysql global pthread attr
+
+int
+Ndb_component::start()
+{
+ assert(m_thread_state == TS_INIT);
+ pthread_mutex_lock(&m_start_stop_mutex);
+ m_thread_state= TS_STARTING;
+ int res= pthread_create(&m_thread, &connection_attrib, Ndb_component_run_C,
+ this);
+
+ if (res == 0)
+ {
+ while (m_thread_state == TS_STARTING)
+ {
+ pthread_cond_wait(&m_start_stop_cond, &m_start_stop_mutex);
+ }
+ pthread_mutex_unlock(&m_start_stop_mutex);
+ return m_thread_state == TS_RUNNING ? 0 : 1;
+ }
+
+ pthread_mutex_unlock(&m_start_stop_mutex);
+ return res;
+}
+
+void
+Ndb_component::run_impl()
+{
+ pthread_detach_this_thread();
+ pthread_mutex_lock(&m_start_stop_mutex);
+ if (m_thread_state == TS_STARTING)
+ {
+ m_thread_state= TS_RUNNING;
+ pthread_cond_signal(&m_start_stop_cond);
+ pthread_mutex_unlock(&m_start_stop_mutex);
+ do_run();
+ pthread_mutex_lock(&m_start_stop_mutex);
+ }
+ m_thread_state = TS_STOPPED;
+ pthread_cond_signal(&m_start_stop_cond);
+ pthread_mutex_unlock(&m_start_stop_mutex);
+}
+
+bool
+Ndb_component::is_stop_requested()
+{
+ bool res = false;
+ pthread_mutex_lock(&m_start_stop_mutex);
+ res = m_thread_state != TS_RUNNING;
+ pthread_mutex_unlock(&m_start_stop_mutex);
+ return res;
+}
+
+int
+Ndb_component::stop()
+{
+ pthread_mutex_lock(&m_start_stop_mutex);
+ assert(m_thread_state == TS_RUNNING ||
+ m_thread_state == TS_STOPPING ||
+ m_thread_state == TS_STOPPED);
+
+ if (m_thread_state == TS_RUNNING)
+ {
+ m_thread_state= TS_STOPPING;
+ }
+
+ if (m_thread_state == TS_STOPPING)
+ {
+ while (m_thread_state != TS_STOPPED)
+ {
+ pthread_cond_signal(&m_start_stop_cond);
+ pthread_cond_wait(&m_start_stop_cond, &m_start_stop_mutex);
+ }
+ }
+ pthread_mutex_unlock(&m_start_stop_mutex);
+
+ return 0;
+}
+
+int
+Ndb_component::deinit()
+{
+ assert(m_thread_state == TS_STOPPED);
+ pthread_mutex_destroy(&m_start_stop_mutex);
+ pthread_cond_destroy(&m_start_stop_cond);
+ return do_deinit();
+}
=== added file 'sql/ndb_component.h'
--- a/sql/ndb_component.h 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_component.h 2011-11-10 08:16:52 +0000
@@ -0,0 +1,82 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef HA_NDBCLUSTER_COMPONENT_H
+#define HA_NDBCLUSTER_COMPONENT_H
+
+#include <my_global.h>
+#include <my_pthread.h>
+
+extern "C" void * Ndb_component_run_C(void *);
+
+class Ndb_component
+{
+public:
+ virtual int init();
+ virtual int start();
+ virtual int stop();
+ virtual int deinit();
+
+protected:
+ /**
+ * Con/de-structor is protected...so that sub-class needs to provide own
+ */
+ Ndb_component();
+ virtual ~Ndb_component();
+
+ /**
+ * Component init function
+ */
+ virtual int do_init() = 0;
+
+ /**
+ * Component run function
+ */
+ virtual void do_run() = 0;
+
+ /**
+ * Component deinit function
+ */
+ virtual int do_deinit() = 0;
+
+ /**
+ * For usage in threads main loop
+ */
+ bool is_stop_requested();
+
+private:
+
+ enum ThreadState
+ {
+ TS_UNINIT = 0,
+ TS_INIT = 1,
+ TS_STARTING = 2,
+ TS_RUNNING = 3,
+ TS_STOPPING = 4,
+ TS_STOPPED = 5
+ };
+
+ ThreadState m_thread_state;
+ pthread_t m_thread;
+ pthread_mutex_t m_start_stop_mutex;
+ pthread_cond_t m_start_stop_cond;
+
+ void run_impl();
+ friend void * Ndb_component_run_C(void *);
+};
+
+#endif
=== added file 'sql/ndb_util_thread.h'
--- a/sql/ndb_util_thread.h 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_util_thread.h 2011-11-10 08:16:52 +0000
@@ -0,0 +1,40 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_UTIL_THREAD_H
+#define NDB_UTIL_THREAD_H
+
+#include "ndb_component.h"
+
+class Ndb_util_thread : public Ndb_component
+{
+public:
+ Ndb_util_thread();
+ virtual ~Ndb_util_thread();
+
+ int running;
+ pthread_mutex_t LOCK;
+ pthread_cond_t COND;
+ pthread_cond_t COND_ready;
+
+private:
+ virtual int do_init() { return 0;}
+ virtual void do_run();
+ virtual int do_deinit() { return 0;}
+};
+
+#endif
=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt 2011-09-08 06:22:07 +0000
+++ b/storage/ndb/CMakeLists.txt 2011-11-10 14:23:53 +0000
@@ -178,7 +178,8 @@ SET(NDBCLUSTER_SOURCES
../../sql/ha_ndb_index_stat.cc
../../sql/ha_ndbinfo.cc
../../sql/ndb_mi.cc
- ../../sql/ndb_conflict_trans.cc)
+ ../../sql/ndb_conflict_trans.cc
+ ../../sql/ndb_component.cc)
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/ndb/include)
IF(EXISTS ${CMAKE_SOURCE_DIR}/storage/mysql_storage_engine.cmake)
=== modified file 'storage/ndb/include/kernel/signaldata/ScanFrag.hpp'
--- a/storage/ndb/include/kernel/signaldata/ScanFrag.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/ScanFrag.hpp 2011-11-09 13:10:53 +0000
@@ -177,7 +177,7 @@ public:
Uint32 fragmentCompleted;
Uint32 transId1;
Uint32 transId2;
- Uint32 total_len;
+ Uint32 total_len; // Total #Uint32 returned as TRANSID_AI
};
class ScanFragRef {
=== modified file 'storage/ndb/include/kernel/signaldata/TupKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/TupKey.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/TupKey.hpp 2011-11-09 13:10:53 +0000
@@ -91,7 +91,7 @@ private:
* DATA VARIABLES
*/
Uint32 userPtr;
- Uint32 readLength;
+ Uint32 readLength; // Length in Uint32 words
Uint32 writeLength;
Uint32 noFiredTriggers;
Uint32 lastRow;
=== modified file 'storage/ndb/include/ndbapi/NdbReceiver.hpp'
--- a/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-08-17 12:36:56 +0000
+++ b/storage/ndb/include/ndbapi/NdbReceiver.hpp 2011-11-09 13:10:53 +0000
@@ -105,16 +105,13 @@ private:
static
void calculate_batch_size(const NdbImpl&,
- const NdbRecord *,
- const NdbRecAttr *first_rec_attr,
- Uint32, Uint32, Uint32&, Uint32&, Uint32&);
-
- void calculate_batch_size(Uint32 key_size,
Uint32 parallelism,
Uint32& batch_size,
- Uint32& batch_byte_size,
- Uint32& first_batch_size,
- const NdbRecord *rec) const;
+ Uint32& batch_byte_size);
+
+ void calculate_batch_size(Uint32 parallelism,
+ Uint32& batch_size,
+ Uint32& batch_byte_size) const;
/*
Set up buffers for receiving TRANSID_AI and KEYINFO20 signals
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-10-28 16:37:39 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-11-10 14:23:53 +0000
@@ -11205,7 +11205,7 @@ void Dblqh::scanTupkeyConfLab(Signal* si
tdata4 += sendKeyinfo20(signal, scanptr.p, tcConnectptr.p);
}//if
ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
- scanptr.p->m_curr_batch_size_bytes+= tdata4;
+ scanptr.p->m_curr_batch_size_bytes+= tdata4 * sizeof(Uint32);
scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->m_last_row = tdata5;
if (scanptr.p->check_scan_batch_completed() | tdata5){
@@ -11832,6 +11832,7 @@ void Dblqh::releaseScanrec(Signal* signa
/* ------------------------------------------------------------------------
* ------- SEND KEYINFO20 TO API -------
*
+ * Return: Length in number of Uint32 words
* ------------------------------------------------------------------------ */
Uint32 Dblqh::sendKeyinfo20(Signal* signal,
ScanRecord * scanP,
@@ -11968,7 +11969,9 @@ Uint32 Dblqh::sendKeyinfo20(Signal* sign
void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
{
Uint32 completed_ops= scanptr.p->m_curr_batch_size_rows;
- Uint32 total_len= scanptr.p->m_curr_batch_size_bytes;
+ Uint32 total_len= scanptr.p->m_curr_batch_size_bytes / sizeof(Uint32);
+ ndbassert((scanptr.p->m_curr_batch_size_bytes % sizeof(Uint32)) == 0);
+
scanptr.p->scanTcWaiting = 0;
if(ERROR_INSERTED(5037)){
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-10-03 07:08:19 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2011-11-10 14:23:53 +0000
@@ -871,6 +871,7 @@ public:
Uint32 m_senderRef;
Uint32 m_senderData;
Uint32 m_rootResultData;
+ Uint32 m_rootFragId;
Uint32 m_transId[2];
TreeNode_list::Head m_nodes;
TreeNodeCursor_list::Head m_cursor_nodes;
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-11-03 10:20:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-11-10 14:23:53 +0000
@@ -482,6 +482,7 @@ Dbspj::do_init(Request* requestP, const
requestP->m_outstanding = 0;
requestP->m_transId[0] = req->transId1;
requestP->m_transId[1] = req->transId2;
+ requestP->m_rootFragId = LqhKeyReq::getFragmentId(req->fragmentData);
bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data));
#ifdef SPJ_TRACE_TIME
requestP->m_cnt_batches = 0;
@@ -777,6 +778,7 @@ Dbspj::do_init(Request* requestP, const
requestP->m_transId[0] = req->transId1;
requestP->m_transId[1] = req->transId2;
requestP->m_rootResultData = req->resultData;
+ requestP->m_rootFragId = req->fragmentNoKeyLen;
bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data));
#ifdef SPJ_TRACE_TIME
requestP->m_cnt_batches = 0;
@@ -2187,12 +2189,6 @@ Dbspj::storeRow(Ptr<Request> requestPtr,
jam();
return DbspjErr::OutOfRowMemory;
}
-
- row.m_type = RowPtr::RT_LINEAR;
- row.m_row_data.m_linear.m_row_ref = ref;
- row.m_row_data.m_linear.m_header = (RowPtr::Header*)(dstptr + linklen);
- row.m_row_data.m_linear.m_data = dstptr + linklen + headlen;
-
memcpy(dstptr + linklen, headptr, 4 * headlen);
copy(dstptr + linklen + headlen, dataPtr);
@@ -2205,9 +2201,30 @@ Dbspj::storeRow(Ptr<Request> requestPtr,
else
{
jam();
- return add_to_map(requestPtr, treeNodePtr, row.m_src_correlation, ref);
+ Uint32 error = add_to_map(requestPtr, treeNodePtr, row.m_src_correlation, ref);
+ if (unlikely(error))
+ return error;
+ }
+
+ /**
+ * Refetch pointer to alloc'ed row memory before creating RowPtr
+ * as above add_to_xxx may mave reorganized memory causing
+ * alloced row to be moved.
+ */
+ Uint32 * rowptr = 0;
+ if (ref.m_allocator == 0)
+ {
+ jam();
+ rowptr = get_row_ptr_stack(ref);
+ }
+ else
+ {
+ jam();
+ rowptr = get_row_ptr_var(ref);
}
+//ndbrequire(rowptr==dstptr); // It moved which we now do handle
+ setupRowPtr(treeNodePtr, row, ref, rowptr);
return 0;
}
@@ -4615,12 +4632,17 @@ Dbspj::execDIH_SCAN_TAB_CONF(Signal* sig
Ptr<Request> requestPtr;
m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
+ // Add a skew in the fragment lists such that we don't scan
+ // the same subset of frags fram all SPJ requests in case of
+ // the scan not being ' T_SCAN_PARALLEL'
+ Uint16 fragNoOffs = requestPtr.p->m_rootFragId % fragCount;
+
Ptr<ScanFragHandle> fragPtr;
Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr)))
{
jam();
- fragPtr.p->init(0);
+ fragPtr.p->init(fragNoOffs);
fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
list.addLast(fragPtr);
}
@@ -4686,10 +4708,11 @@ Dbspj::execDIH_SCAN_TAB_CONF(Signal* sig
{
jam();
Ptr<ScanFragHandle> fragPtr;
+ Uint16 fragNo = (fragNoOffs+i) % fragCount;
if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr)))
{
jam();
- fragPtr.p->init(i);
+ fragPtr.p->init(fragNo);
fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
list.addLast(fragPtr);
}
@@ -5192,6 +5215,7 @@ Dbspj::scanIndex_send(Signal* signal,
jam();
ndbassert(bs_bytes > 0);
ndbassert(bs_rows > 0);
+ ndbassert(bs_rows <= bs_bytes);
/**
* if (m_bits & prunemask):
* - Range keys sliced out to each ScanFragHandle
@@ -5420,6 +5444,7 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
Uint32 rows = conf->completedOps;
Uint32 done = conf->fragmentCompleted;
+ Uint32 bytes = conf->total_len * sizeof(Uint32);
Uint32 state = fragPtr.p->m_state;
ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
@@ -5435,9 +5460,9 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
requestPtr.p->m_rows += rows;
data.m_totalRows += rows;
- data.m_totalBytes += conf->total_len;
+ data.m_totalBytes += bytes;
data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
- data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
+ data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, bytes);
if (!treeNodePtr.p->isLeaf())
{
@@ -5532,37 +5557,43 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
org->batch_size_rows / data.m_parallelism * (data.m_parallelism - 1)
+ data.m_totalRows;
- // Number of rows that we can still fetch in this batch.
+ // Number of rows & bytes that we can still fetch in this batch.
const Int32 remainingRows
= static_cast<Int32>(org->batch_size_rows - maxCorrVal);
-
+ const Int32 remainingBytes
+ = static_cast<Int32>(org->batch_size_bytes - data.m_totalBytes);
+
if (remainingRows >= data.m_frags_not_started &&
+ remainingBytes >= data.m_frags_not_started &&
/**
* Check that (remaning row capacity)/(remaining fragments) is
* greater or equal to (rows read so far)/(finished fragments).
*/
remainingRows * static_cast<Int32>(data.m_parallelism) >=
- static_cast<Int32>(data.m_totalRows * data.m_frags_not_started) &&
- (org->batch_size_bytes - data.m_totalBytes) * data.m_parallelism >=
- data.m_totalBytes * data.m_frags_not_started)
+ static_cast<Int32>(data.m_totalRows * data.m_frags_not_started) &&
+ remainingBytes * static_cast<Int32>(data.m_parallelism) >=
+ static_cast<Int32>(data.m_totalBytes * data.m_frags_not_started))
{
jam();
Uint32 batchRange = maxCorrVal;
+ Uint32 bs_rows = remainingRows / data.m_frags_not_started;
+ Uint32 bs_bytes = remainingBytes / data.m_frags_not_started;
+
DEBUG("::scanIndex_execSCAN_FRAGCONF() first batch was not full."
" Asking for new batches from " << data.m_frags_not_started <<
" fragments with " <<
- remainingRows / data.m_frags_not_started
- <<" rows and " <<
- (org->batch_size_bytes - data.m_totalBytes)
- / data.m_frags_not_started
- << " bytes.");
+ bs_rows <<" rows and " <<
+ bs_bytes << " bytes.");
+
+ if (unlikely(bs_rows > bs_bytes))
+ bs_rows = bs_bytes;
+
scanIndex_send(signal,
requestPtr,
treeNodePtr,
data.m_frags_not_started,
- (org->batch_size_bytes - data.m_totalBytes)
- / data.m_frags_not_started,
- remainingRows / data.m_frags_not_started,
+ bs_bytes,
+ bs_rows,
batchRange);
return;
}
=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2011-10-21 10:58:09 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2011-11-10 14:23:53 +0000
@@ -6795,8 +6795,6 @@ NdbDictionaryImpl::initialiseColumnData(
recCol->orgAttrSize= col->m_orgAttrSize;
if (recCol->offset+recCol->maxSize > rec->m_row_size)
rec->m_row_size= recCol->offset+recCol->maxSize;
- /* Round data size to whole words + 4 bytes of AttributeHeader. */
- rec->m_max_transid_ai_bytes+= (recCol->maxSize+7) & ~3;
recCol->charset_info= col->m_cs;
recCol->compare_function= NdbSqlUtil::getType(col->m_type).m_cmp;
recCol->flags= 0;
@@ -6985,7 +6983,6 @@ NdbDictionaryImpl::createRecord(const Nd
}
rec->m_row_size= 0;
- rec->m_max_transid_ai_bytes= 0;
for (i= 0; i<length; i++)
{
const NdbDictionary::RecordSpecification *rs= &recSpec[i];
=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-09-19 10:11:11 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-11-10 14:23:53 +0000
@@ -878,6 +878,11 @@ NdbIndexStatImpl::sys_read_head(Con& con
return -1;
if (sys_head_getvalue(con) == -1)
return -1;
+ if (con.m_op->setAbortOption(NdbOperation::AbortOnError) == -1)
+ {
+ setError(con, __LINE__);
+ return -1;
+ }
if (con.execute(commit) == -1)
{
setError(con, __LINE__);
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-10-28 13:38:36 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-11-09 13:10:53 +0000
@@ -3058,20 +3058,16 @@ NdbQueryImpl::doSend(int nodeId, bool la
scanTabReq->transId2 = (Uint32) (transId >> 32);
Uint32 batchRows = root.getMaxBatchRows();
- Uint32 batchByteSize, firstBatchRows;
+ Uint32 batchByteSize;
NdbReceiver::calculate_batch_size(* ndb.theImpl,
- root.m_ndbRecord,
- root.m_firstRecAttr,
- 0, // Key size.
getRootFragCount(),
batchRows,
- batchByteSize,
- firstBatchRows);
+ batchByteSize);
assert(batchRows==root.getMaxBatchRows());
- assert(batchRows==firstBatchRows);
+ assert(batchRows<=batchByteSize);
ScanTabReq::setScanBatch(reqInfo, batchRows);
scanTabReq->batch_byte_size = batchByteSize;
- scanTabReq->first_batch_size = firstBatchRows;
+ scanTabReq->first_batch_size = batchRows;
ScanTabReq::setViaSPJFlag(reqInfo, 1);
ScanTabReq::setPassAllConfsFlag(reqInfo, 1);
@@ -4361,11 +4357,11 @@ NdbQueryOperationImpl
* We must thus make sure that we do not set a batch size for the scan
* that exceeds what any of its scan descendants can use.
*
- * Ignore calculated 'batchByteSize' and 'firstBatchRows'
+ * Ignore calculated 'batchByteSize'
* here - Recalculated when building signal after max-batchRows has been
* determined.
*/
- Uint32 batchByteSize, firstBatchRows;
+ Uint32 batchByteSize;
/**
* myClosestScan->m_maxBatchRows may be zero to indicate that we
* should use default values, or non-zero if the application had an
@@ -4373,18 +4369,14 @@ NdbQueryOperationImpl
*/
maxBatchRows = myClosestScan->m_maxBatchRows;
NdbReceiver::calculate_batch_size(* ndb.theImpl,
- m_ndbRecord,
- m_firstRecAttr,
- 0, // Key size.
getRoot().m_parallelism
- == Parallelism_max ?
- m_queryImpl.getRootFragCount() :
- getRoot().m_parallelism,
+ == Parallelism_max
+ ? m_queryImpl.getRootFragCount()
+ : getRoot().m_parallelism,
maxBatchRows,
- batchByteSize,
- firstBatchRows);
+ batchByteSize);
assert(maxBatchRows > 0);
- assert(firstBatchRows == maxBatchRows);
+ assert(maxBatchRows <= batchByteSize);
}
// Find the largest value that is acceptable to all lookup descendants.
@@ -4554,17 +4546,13 @@ NdbQueryOperationImpl::prepareAttrInfo(U
Ndb& ndb = *m_queryImpl.getNdbTransaction().getNdb();
Uint32 batchRows = getMaxBatchRows();
- Uint32 batchByteSize, firstBatchRows;
+ Uint32 batchByteSize;
NdbReceiver::calculate_batch_size(* ndb.theImpl,
- m_ndbRecord,
- m_firstRecAttr,
- 0, // Key size.
m_queryImpl.getRootFragCount(),
batchRows,
- batchByteSize,
- firstBatchRows);
- assert(batchRows == firstBatchRows);
+ batchByteSize);
assert(batchRows == getMaxBatchRows());
+ assert(batchRows <= batchByteSize);
assert(m_parallelism == Parallelism_max ||
m_parallelism == Parallelism_adaptive);
if (m_parallelism == Parallelism_max)
=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-08-17 12:36:56 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp 2011-11-09 13:10:53 +0000
@@ -155,88 +155,57 @@ NdbReceiver::prepareRead(char *buf, Uint
Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
use, taking into account limits in the transporter, user preference, etc.
- Hm, there are some magic overhead numbers (4 bytes/attr, 32 bytes/row) here,
- would be nice with some explanation on how these numbers were derived.
+ It is the responsibility of the batch producer (LQH+TUP) to
+ stay within these 'batch_size' and 'batch_byte_size' limits.:
- TODO : Check whether these numbers need to be revised w.r.t. read packed
+ - It should stay strictly within the 'batch_size' (#rows) limit.
+ - It is allowed to overallocate the 'batch_byte_size' (slightly)
+ in order to complete the current row when it hit the limit.
+
+ The client should be prepared to receive, and buffer, upto
+ 'batch_size' rows from each fragment.
+ ::ndbrecord_rowsize() might be usefull for calculating the
+ buffersize to allocate for this resultset.
*/
//static
void
NdbReceiver::calculate_batch_size(const NdbImpl& theImpl,
- const NdbRecord *record,
- const NdbRecAttr *first_rec_attr,
- Uint32 key_size,
Uint32 parallelism,
Uint32& batch_size,
- Uint32& batch_byte_size,
- Uint32& first_batch_size)
+ Uint32& batch_byte_size)
{
const NdbApiConfig & cfg = theImpl.get_ndbapi_config_parameters();
const Uint32 max_scan_batch_size= cfg.m_scan_batch_size;
const Uint32 max_batch_byte_size= cfg.m_batch_byte_size;
const Uint32 max_batch_size= cfg.m_batch_size;
- Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
- if (record)
- {
- tot_size+= record->m_max_transid_ai_bytes;
- }
-
- const NdbRecAttr *rec_attr= first_rec_attr;
- while (rec_attr != NULL) {
- Uint32 attr_size= rec_attr->getColumn()->getSizeInBytes();
- attr_size= ((attr_size + 4 + 3) >> 2) << 2; //Even to word + overhead
- tot_size+= attr_size;
- rec_attr= rec_attr->next();
+ batch_byte_size= max_batch_byte_size;
+ if (batch_byte_size * parallelism > max_scan_batch_size) {
+ batch_byte_size= max_scan_batch_size / parallelism;
}
- tot_size+= 32; //include signal overhead
-
- /**
- * Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
- * bytes sent for each batch from each node. We do however ensure that
- * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
- * batch.
- */
- if (batch_size == 0)
- {
- batch_byte_size= max_batch_byte_size;
+ if (batch_size == 0 || batch_size > max_batch_size) {
+ batch_size= max_batch_size;
}
- else
- {
- batch_byte_size= batch_size * tot_size;
+ if (unlikely(batch_size > MAX_PARALLEL_OP_PER_SCAN)) {
+ batch_size= MAX_PARALLEL_OP_PER_SCAN;
}
-
- if (batch_byte_size * parallelism > max_scan_batch_size) {
- batch_byte_size= max_scan_batch_size / parallelism;
- }
- batch_size= batch_byte_size / tot_size;
- if (batch_size == 0) {
- batch_size= 1;
- } else {
- if (batch_size > max_batch_size) {
- batch_size= max_batch_size;
- } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
- batch_size= MAX_PARALLEL_OP_PER_SCAN;
- }
+ if (unlikely(batch_size > batch_byte_size)) {
+ batch_size= batch_byte_size;
}
- first_batch_size= batch_size;
+
return;
}
void
-NdbReceiver::calculate_batch_size(Uint32 key_size,
- Uint32 parallelism,
+NdbReceiver::calculate_batch_size(Uint32 parallelism,
Uint32& batch_size,
- Uint32& batch_byte_size,
- Uint32& first_batch_size,
- const NdbRecord *record) const
+ Uint32& batch_byte_size) const
{
calculate_batch_size(* m_ndb->theImpl,
- record,
- theFirstRecAttr,
- key_size, parallelism, batch_size, batch_byte_size,
- first_batch_size);
+ parallelism,
+ batch_size,
+ batch_byte_size);
}
void
=== modified file 'storage/ndb/src/ndbapi/NdbRecord.hpp'
--- a/storage/ndb/src/ndbapi/NdbRecord.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/ndbapi/NdbRecord.hpp 2011-11-09 13:10:53 +0000
@@ -189,8 +189,6 @@ public:
Uint32 tableVersion;
/* Copy of table->m_keyLenInWords. */
Uint32 m_keyLenInWords;
- /* Total maximum size of TRANSID_AI data (for computing batch size). */
- Uint32 m_max_transid_ai_bytes;
/**
* Number of distribution keys (usually == number of primary keys).
*
=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2011-05-17 12:47:21 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2011-11-09 13:10:53 +0000
@@ -2284,16 +2284,13 @@ int NdbScanOperation::prepareSendScan(Ui
*/
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
Uint32 batch_size = req->first_batch_size; // User specified
- Uint32 batch_byte_size, first_batch_size;
- theReceiver.calculate_batch_size(key_size,
- theParallelism,
+ Uint32 batch_byte_size;
+ theReceiver.calculate_batch_size(theParallelism,
batch_size,
- batch_byte_size,
- first_batch_size,
- m_attribute_record);
+ batch_byte_size);
ScanTabReq::setScanBatch(req->requestInfo, batch_size);
req->batch_byte_size= batch_byte_size;
- req->first_batch_size= first_batch_size;
+ req->first_batch_size= batch_size;
/**
* Set keyinfo, nodisk and distribution key flags in
=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp 2011-11-09 14:19:23 +0000
@@ -37,7 +37,7 @@
#define MAX_SEEK 16
#define MAXSTRLEN 16
#define MAXATTR 64
-#define MAXTABLES 64
+#define MAXTABLES 1
#define NDB_MAXTHREADS 128
/*
NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
@@ -59,6 +59,16 @@ enum StartType {
stStop
} ;
+enum RunType {
+ RunInsert,
+ RunRead,
+ RunUpdate,
+ RunDelete,
+ RunCreateTable,
+ RunDropTable,
+ RunAll
+};
+
struct ThreadNdb
{
int NoOfOps;
@@ -70,6 +80,7 @@ extern "C" { static void* threadLoop(voi
static void setAttrNames(void);
static void setTableNames(void);
static int readArguments(int argc, const char** argv);
+static void dropTables(Ndb* pMyNdb);
static int createTables(Ndb*);
static void defineOperation(NdbConnection* aTransObject, StartType aType,
Uint32 base, Uint32 aIndex);
@@ -77,6 +88,8 @@ static void defineNdbRecordOperation(Thr
Uint32 base, Uint32 aIndex);
static void execute(StartType aType);
static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
+static bool executeTransLoop(ThreadNdb* pThread, StartType aType, Ndb* aNdbObject,
+ unsigned int threadBase, int threadNo);
static void executeCallback(int result, NdbConnection* NdbObject,
void* aObject);
static bool error_handler(const NdbError & err);
@@ -92,9 +105,15 @@ ErrorData * flexAsynchErrorData;
static NdbThread* threadLife[NDB_MAXTHREADS];
static int tNodeId;
static int ThreadReady[NDB_MAXTHREADS];
+static longlong ThreadExecutions[NDB_MAXTHREADS];
static StartType ThreadStart[NDB_MAXTHREADS];
static char tableName[MAXTABLES][MAXSTRLEN+1];
static char attrName[MAXATTR][MAXSTRLEN+1];
+static RunType tRunType = RunAll;
+static int tStdTableNum = 0;
+static int tWarmupTime = 10; //Seconds
+static int tExecutionTime = 30; //Seconds
+static int tCooldownTime = 10; //Seconds
// Program Parameters
static NdbRecord * g_record[MAXTABLES];
@@ -126,9 +145,10 @@ static int
#define START_REAL_TIME
#define STOP_REAL_TIME
-#define START_TIMER { NdbTimer timer; timer.doStart();
+#define DEFINE_TIMER NdbTimer timer
+#define START_TIMER timer.doStart();
#define STOP_TIMER timer.doStop();
-#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
+#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans)
NDBT_Stats a_i, a_u, a_d, a_r;
@@ -183,6 +203,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
ThreadNdb* pThreadData;
int tLoops=0;
int returnValue = NDBT_OK;
+ DEFINE_TIMER;
flexAsynchErrorData = new ErrorData;
flexAsynchErrorData->resetErrorCounters();
@@ -201,7 +222,13 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
ndbout << " " << tNoOfParallelTrans;
ndbout << " number of parallel operation per thread " << endl;
ndbout << " " << tNoOfTransactions << " transaction(s) per round " << endl;
- ndbout << " " << tNoOfLoops << " iterations " << endl;
+ if (tRunType == RunAll){
+ ndbout << " " << tNoOfLoops << " iterations " << endl;
+ } else if (tRunType == RunRead || tRunType == RunUpdate){
+ ndbout << " Warmup time is " << tWarmupTime << endl;
+ ndbout << " Execution time is " << tExecutionTime << endl;
+ ndbout << " Cooldown time is " << tCooldownTime << endl;
+ }
ndbout << " " << "Load Factor is " << tLoadFactor << "%" << endl;
ndbout << " " << tNoOfAttributes << " attributes per table " << endl;
ndbout << " " << tAttributeSize;
@@ -262,10 +289,20 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
if (pNdb->waitUntilReady(10000) != 0){
ndbout << "NDB is not ready" << endl;
ndbout << "Benchmark failed!" << endl;
- returnValue = NDBT_FAILED;
+ return NDBT_ProgramExit(NDBT_FAILED);
}
- if(returnValue == NDBT_OK){
+ if (tRunType == RunCreateTable)
+ {
+ if (createTables(pNdb) != 0){
+ returnValue = NDBT_FAILED;
+ }
+ }
+ else if (tRunType == RunDropTable)
+ {
+ dropTables(pNdb);
+ }
+ else if(returnValue == NDBT_OK){
if (createTables(pNdb) != 0){
returnValue = NDBT_FAILED;
}
@@ -282,14 +319,15 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
}
}
- if(returnValue == NDBT_OK){
+ if(returnValue == NDBT_OK &&
+ tRunType != RunCreateTable &&
+ tRunType != RunDropTable){
/****************************************************************
* Create NDB objects. *
****************************************************************/
resetThreads();
for (Uint32 i = 0; i < tNoOfThreads ; i++) {
- pThreadData[i].ThreadNo = i
-;
+ pThreadData[i].ThreadNo = i;
threadLife[i] = NdbThread_Create(threadLoop,
(void**)&pThreadData[i],
32768,
@@ -312,76 +350,86 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
****************************************************************/
failed = 0 ;
-
- START_TIMER;
- execute(stInsert);
- STOP_TIMER;
- a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-
- if (0 < failed) {
- int i = retry_opt ;
- int ci = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"
- << endl << endl;
- ndbout << "Attempting to redo the failed transactions now..."
- << endl ;
- ndbout << "Redo attempt " << ci <<" out of " << retry_opt
- << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stInsert);
- STOP_TIMER;
- PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- ci++;
- }
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- }else{
- ndbout << endl <<"Redo attempt failed, moving on now..." << endl
- << endl;
+ if (tRunType == RunAll || tRunType == RunInsert){
+ ndbout << "Executing inserts" << endl;
+ START_TIMER;
+ execute(stInsert);
+ STOP_TIMER;
+ }
+ if (tRunType == RunAll){
+ a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+
+ if (0 < failed) {
+ int i = retry_opt ;
+ int ci = 1 ;
+ while (0 < failed && 0 < i){
+ ndbout << failed << " of the transactions returned errors!"
+ << endl << endl;
+ ndbout << "Attempting to redo the failed transactions now..."
+ << endl ;
+ ndbout << "Redo attempt " << ci <<" out of " << retry_opt
+ << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stInsert);
+ STOP_TIMER;
+ PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ ci++;
+ }
+ if(0 == failed ){
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ }else{
+ ndbout << endl <<"Redo attempt failed, moving on now..." << endl
+ << endl;
+ }//if
}//if
- }//if
-
+ }//if
/****************************************************************
* Perform read. *
****************************************************************/
failed = 0 ;
- for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
- {
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- }
-
- if (0 < failed) {
- int i = retry_opt ;
- int cr = 1;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl;
- ndbout <<"Redo attempt " << cr <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
+ if (tRunType == RunAll || tRunType == RunRead){
+ for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+ {
+ ndbout << "Executing reads" << endl;
START_TIMER;
execute(stRead);
STOP_TIMER;
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cr++ ;
- }//while
- if(0 == failed ) {
- ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
- }else{
- ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+ if (tRunType == RunAll){
+ a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ }//if
+ }//for
+ }//if
+
+ if (tRunType == RunAll){
+ if (0 < failed) {
+ int i = retry_opt ;
+ int cr = 1;
+ while (0 < failed && 0 < i){
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl;
+ ndbout <<"Redo attempt " << cr <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cr++ ;
+ }//while
+ if(0 == failed ) {
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+ }else{
+ ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+ }//if
}//if
}//if
@@ -391,35 +439,40 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
****************************************************************/
failed = 0 ;
-
- START_TIMER;
- execute(stUpdate);
- STOP_TIMER;
- a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
-
- if (0 < failed) {
- int i = retry_opt ;
- int cu = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl <<"Redo attempt " << cu <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stUpdate);
- STOP_TIMER;
- PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cu++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- } else {
- ndbout << endl;
- ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+
+ if (tRunType == RunAll || tRunType == RunUpdate){
+ ndbout << "Executing updates" << endl;
+ START_TIMER;
+ execute(stUpdate);
+ STOP_TIMER;
+ }//if
+ if (tRunType == RunAll){
+ a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
+
+ if (0 < failed) {
+ int i = retry_opt ;
+ int cu = 1 ;
+ while (0 < failed && 0 < i){
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl <<"Redo attempt " << cu <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stUpdate);
+ STOP_TIMER;
+ PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cu++ ;
+ }//while
+ if(0 == failed ){
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ } else {
+ ndbout << endl;
+ ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+ }//if
}//if
}//if
@@ -428,38 +481,41 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
****************************************************************/
failed = 0 ;
-
- for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
- {
- START_TIMER;
- execute(stRead);
- STOP_TIMER;
- a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- }
-
- if (0 < failed) {
- int i = retry_opt ;
- int cr2 = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<<endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now..." << endl;
- ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
+
+ if (tRunType == RunAll){
+ for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+ {
+ ndbout << "Executing reads" << endl;
START_TIMER;
execute(stRead);
STOP_TIMER;
+ a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cr2++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl;
- }else{
- ndbout << endl;
- ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ }
+
+ if (0 < failed) {
+ int i = retry_opt ;
+ int cr2 = 1 ;
+ while (0 < failed && 0 < i){
+ ndbout << failed << " of the transactions returned errors!"<<endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now..." << endl;
+ ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stRead);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cr2++ ;
+ }//while
+ if(0 == failed ){
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+ }else{
+ ndbout << endl;
+ ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ }//if
}//if
}//if
@@ -470,34 +526,39 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
failed = 0 ;
- START_TIMER;
- execute(stDelete);
- STOP_TIMER;
- a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
- PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
-
- if (0 < failed) {
- int i = retry_opt ;
- int cd = 1 ;
- while (0 < failed && 0 < i){
- ndbout << failed << " of the transactions returned errors!"<< endl ;
- ndbout << endl;
- ndbout <<"Attempting to redo the failed transactions now:" << endl ;
- ndbout << endl <<"Redo attempt " << cd <<" out of ";
- ndbout << retry_opt << endl << endl;
- failed = 0 ;
- START_TIMER;
- execute(stDelete);
- STOP_TIMER;
- PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
- i-- ;
- cd++ ;
- }//while
- if(0 == failed ){
- ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
- }else{
- ndbout << endl;
- ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ if (tRunType == RunAll || tRunType == RunDelete){
+ ndbout << "Executing deletes" << endl;
+ START_TIMER;
+ execute(stDelete);
+ STOP_TIMER;
+ }//if
+ if (tRunType == RunAll){
+ a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+ PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
+
+ if (0 < failed) {
+ int i = retry_opt ;
+ int cd = 1 ;
+ while (0 < failed && 0 < i){
+ ndbout << failed << " of the transactions returned errors!"<< endl ;
+ ndbout << endl;
+ ndbout <<"Attempting to redo the failed transactions now:" << endl ;
+ ndbout << endl <<"Redo attempt " << cd <<" out of ";
+ ndbout << retry_opt << endl << endl;
+ failed = 0 ;
+ START_TIMER;
+ execute(stDelete);
+ STOP_TIMER;
+ PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+ i-- ;
+ cd++ ;
+ }//while
+ if(0 == failed ){
+ ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+ }else{
+ ndbout << endl;
+ ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+ }//if
}//if
}//if
@@ -516,17 +577,61 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
NdbThread_WaitFor(threadLife[i], &tmp);
NdbThread_Destroy(&threadLife[i]);
}
- }
+ }
+
+ if (tRunType == RunAll)
+ {
+ dropTables(pNdb);
+ }
delete [] pThreadData;
delete pNdb;
- //printing errorCounters
- flexAsynchErrorData->printErrorCounters(ndbout);
-
- print("insert", a_i);
- print("update", a_u);
- print("delete", a_d);
- print("read ", a_r);
+ if (tRunType == RunAll ||
+ tRunType == RunInsert ||
+ tRunType == RunDelete ||
+ tRunType == RunUpdate ||
+ tRunType == RunRead)
+ {
+ //printing errorCounters
+ flexAsynchErrorData->printErrorCounters(ndbout);
+ if (tRunType == RunAll) {
+ print("insert", a_i);
+ print("update", a_u);
+ print("delete", a_d);
+ print("read ", a_r);
+ }
+ }
+ if (tRunType == RunInsert ||
+ tRunType == RunRead ||
+ tRunType == RunUpdate ||
+ tRunType == RunDelete)
+ {
+ longlong total_executions = 0;
+ longlong total_transactions;
+ longlong exec_time;
+
+ if (tRunType == RunInsert || tRunType == RunDelete) {
+ total_executions = (longlong)tNoOfTransactions;
+ total_executions *= (longlong)tNoOfThreads;
+ } else {
+ for (Uint32 i = 0; i < tNoOfThreads; i++){
+ total_executions += ThreadExecutions[i];
+ }
+ }
+ total_transactions = total_executions * tNoOfParallelTrans;
+ if (tRunType == RunInsert || tRunType == RunDelete) {
+ exec_time = (longlong)timer.elapsedTime();
+ } else {
+ exec_time = (longlong)tExecutionTime * 1000;
+ }
+ ndbout << "Total number of transactions is " << total_transactions;
+ ndbout << endl;
+ ndbout << "Execution time is " << exec_time << " milliseconds" << endl;
+
+ total_transactions = (total_transactions * 1000) / exec_time;
+ int trans_per_sec = (int)total_transactions;
+ ndbout << "Total transactions per second " << trans_per_sec << endl;
+ }
delete [] g_cluster_connection;
@@ -551,7 +656,7 @@ threadLoop(void* ThreadData)
localNdb = new Ndb(g_cluster_connection+(threadNo % tConnections), "TEST_DB");
localNdb->init(1024);
localNdb->waitUntilReady(10000);
- unsigned int threadBase = (threadNo << 16) + tNodeId ;
+ unsigned int threadBase = (threadNo << 16);
for (;;){
while (ThreadStart[threadNo] == stIdle) {
@@ -565,8 +670,14 @@ threadLoop(void* ThreadData)
tType = ThreadStart[threadNo];
ThreadStart[threadNo] = stIdle;
- if(!executeThread(tabThread, tType, localNdb, threadBase)){
- break;
+ if (tRunType == RunAll || tRunType == RunInsert || tRunType == RunDelete){
+ if(!executeThread(tabThread, tType, localNdb, threadBase)){
+ break;
+ }
+ } else {
+ if(!executeTransLoop(tabThread, tType, localNdb, threadBase, threadNo)){
+ break;
+ }
}
ThreadReady[threadNo] = 1;
}//for
@@ -577,80 +688,131 @@ threadLoop(void* ThreadData)
return NULL;
}//threadLoop()
-static
-bool
-executeThread(ThreadNdb* pThread,
- StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+static int error_count = 0;
+static bool
+executeTrans(ThreadNdb* pThread,
+ StartType aType,
+ Ndb* aNdbObject,
+ unsigned int threadBase,
+ unsigned int i)
+{
NdbConnection* tConArray[1024];
unsigned int tBase;
unsigned int tBase2;
- unsigned int extraLoops= 0; // (aType == stRead) ? 100000 : 0;
-
- for (unsigned int ex= 0; ex < (1 + extraLoops); ex++)
- {
- for (unsigned int i = 0; i < tNoOfTransactions; i++) {
- if (tLocal == false) {
- tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
- } else {
- tBase = i * tNoOfParallelTrans * MAX_SEEK;
- }//if
- START_REAL_TIME;
- for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
- if (tLocal == false) {
- tBase2 = tBase + (j * tNoOfOpsPerTrans);
- } else {
- tBase2 = tBase + (j * MAX_SEEK);
- tBase2 = getKey(threadBase, tBase2);
- }//if
- if (startTransGuess == true) {
- union {
- Uint64 Tkey64;
- Uint32 Tkey32[2];
- };
- Tkey32[0] = threadBase;
- Tkey32[1] = tBase2;
- tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
- (const char*)&Tkey64, //Main PKey
- (Uint32)4); //Key Length
- } else {
- tConArray[j] = aNdbObject->startTransaction();
- }//if
- if (tConArray[j] == NULL &&
- !error_handler(aNdbObject->getNdbError()) ){
- ndbout << endl << "Unable to recover! Quiting now" << endl ;
- return false;
- }//if
-
- for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
- //-------------------------------------------------------
- // Define the operation, but do not execute it yet.
- //-------------------------------------------------------
- if (tNdbRecord)
- defineNdbRecordOperation(pThread,
- tConArray[j], aType, threadBase,(tBase2+k));
- else
- defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
- }//for
-
- tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
- }//for
- STOP_REAL_TIME;
+ if (tLocal == false) {
+ tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
+ } else {
+ tBase = i * tNoOfParallelTrans * MAX_SEEK;
+ }//if
+ START_REAL_TIME;
+ for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
+ if (tLocal == false) {
+ tBase2 = tBase + (j * tNoOfOpsPerTrans);
+ } else {
+ tBase2 = tBase + (j * MAX_SEEK);
+ tBase2 = getKey(threadBase, tBase2);
+ }//if
+ if (startTransGuess == true) {
+ union {
+ Uint64 Tkey64;
+ Uint32 Tkey32[2];
+ };
+ Tkey32[0] = threadBase;
+ Tkey32[1] = tBase2;
+ tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
+ (const char*)&Tkey64, //Main PKey
+ (Uint32)4); //Key Length
+ } else {
+ tConArray[j] = aNdbObject->startTransaction();
+ }//if
+ if (tConArray[j] == NULL){
+ error_handler(aNdbObject->getNdbError());
+ ndbout << endl << "Unable to recover! Quiting now" << endl ;
+ return false;
+ }//if
+
+ for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
//-------------------------------------------------------
- // Now we have defined a set of operations, it is now time
- // to execute all of them.
+ // Define the operation, but do not execute it yet.
//-------------------------------------------------------
- int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
- while (unsigned(Tcomp) < tNoOfParallelTrans) {
- int TlocalComp = aNdbObject->pollNdb(3000, 0);
- Tcomp += TlocalComp;
- }//while
- for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
- aNdbObject->closeTransaction(tConArray[j]);
- }//for
+ if (tNdbRecord)
+ defineNdbRecordOperation(pThread,
+ tConArray[j], aType, threadBase,(tBase2+k));
+ else
+ defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
}//for
- } // for
+
+ tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
+ }//for
+ STOP_REAL_TIME;
+ //-------------------------------------------------------
+ // Now we have defined a set of operations, it is now time
+ // to execute all of them.
+ //-------------------------------------------------------
+ int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
+ while (unsigned(Tcomp) < tNoOfParallelTrans) {
+ int TlocalComp = aNdbObject->pollNdb(3000, 0);
+ Tcomp += TlocalComp;
+ }//while
+ for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
+ if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
+ error_count++;
+ ndbout << "i = " << i << ", j = " << j << ", error = ";
+ ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
+ ndbout << hex << threadBase << endl;
+ }
+ aNdbObject->closeTransaction(tConArray[j]);
+ }//for
+ return true;
+}
+
+static
+bool
+executeTransLoop(ThreadNdb* pThread,
+ StartType aType,
+ Ndb* aNdbObject,
+ unsigned int threadBase,
+ int threadNo) {
+ bool continue_flag = true;
+ int time_expired;
+ longlong executions = 0;
+ unsigned int i = 0;
+ DEFINE_TIMER;
+
+ ThreadExecutions[threadNo] = 0;
+ START_TIMER;
+ do
+ {
+ if (!executeTrans(pThread, aType, aNdbObject, threadBase, i++))
+ return false;
+ STOP_TIMER;
+ time_expired = timer.elapsedTime() / 1000;
+ if (time_expired < tWarmupTime)
+ ; //Do nothing
+ else if (time_expired < (tWarmupTime + tExecutionTime)){
+ executions++; //Count measurement
+ }
+ else if (time_expired < (tWarmupTime + tExecutionTime + tCooldownTime))
+ ; //Do nothing
+ else
+ continue_flag = false; //Time expired
+ if (i == tNoOfTransactions) /* Make sure the record exists */
+ i = 0;
+ } while (continue_flag);
+ ThreadExecutions[threadNo] = executions;
+ return true;
+}//executeTransLoop()
+
+static
+bool
+executeThread(ThreadNdb* pThread,
+ StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+ for (unsigned int i = 0; i < tNoOfTransactions; i++) {
+ if (!executeTrans(pThread, aType, aNdbObject, threadBase, i))
+ return false;
+ }//for
return true;
}//executeThread()
@@ -880,8 +1042,20 @@ static void setTableNames()
BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%u", i,
(unsigned)(NdbTick_CurrentMillisecond()+rand()));
} else {
- BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
+ BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", tStdTableNum);
}
+ ndbout << "Using table name " << tableName[0] << endl;
+ }
+}
+
+static void
+dropTables(Ndb* pMyNdb)
+{
+ int i;
+ for (i = 0; i < MAXTABLES; i++)
+ {
+ ndbout << "Dropping table " << tableName[i] << "..." << endl;
+ pMyNdb->getDictionary()->dropTable(tableName[i]);
}
}
@@ -893,8 +1067,8 @@ createTables(Ndb* pMyNdb){
NdbSchemaOp *MySchemaOp;
int check;
- if (theTableCreateFlag == 0) {
- for(int i=0; i < 1 ;i++) {
+ if (theTableCreateFlag == 0 || tRunType == RunCreateTable) {
+ for(int i=0; i < MAXTABLES ;i++) {
ndbout << "Creating " << tableName[i] << "..." << endl;
MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
@@ -953,31 +1127,35 @@ createTables(Ndb* pMyNdb){
return -1;
NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
+ }
+ }
+ if (tNdbRecord)
+ {
+ for(int i=0; i < MAXTABLES ;i++) {
+ NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
+ const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
- if (tNdbRecord)
+ if (pTab == NULL){
+ error_handler(pDict->getNdbError());
+ return -1;
+ }
+ int off = 0;
+ Vector<NdbDictionary::RecordSpecification> spec;
+ for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
{
- NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
- const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
-
- int off = 0;
- Vector<NdbDictionary::RecordSpecification> spec;
- for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
- {
- NdbDictionary::RecordSpecification r0;
- r0.column = pTab->getColumn(j);
- r0.offset = off;
- off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
- spec.push_back(r0);
- }
- g_record[i] =
+ NdbDictionary::RecordSpecification r0;
+ r0.column = pTab->getColumn(j);
+ r0.offset = off;
+ off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
+ spec.push_back(r0);
+ }
+ g_record[i] =
pDict->createRecord(pTab, spec.getBase(),
spec.size(),
sizeof(NdbDictionary::RecordSpecification));
- assert(g_record[i]);
- }
+ assert(g_record[i]);
}
}
-
return 0;
}
@@ -996,6 +1174,14 @@ bool error_handler(const NdbError & err)
return false ; // return false to abort
}
+static void
+setAggregateRun(void)
+{
+ tNoOfLoops = 1;
+ tExtraReadLoop = 0;
+ theTableCreateFlag = 1;
+}
+
static
int
readArguments(int argc, const char** argv){
@@ -1110,6 +1296,43 @@ readArguments(int argc, const char** arg
tExtraReadLoop = atoi(argv[i+1]);
} else if (strcmp(argv[i], "-con") == 0){
tConnections = atoi(argv[i+1]);
+ } else if (strcmp(argv[i], "-insert") == 0){
+ setAggregateRun();
+ tRunType = RunInsert;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-read") == 0){
+ setAggregateRun();
+ tRunType = RunRead;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-update") == 0){
+ setAggregateRun();
+ tRunType = RunUpdate;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-delete") == 0){
+ setAggregateRun();
+ tRunType = RunDelete;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-create_table") == 0){
+ tRunType = RunCreateTable;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-drop_table") == 0){
+ tRunType = RunDropTable;
+ argc++;
+ i--;
+ } else if (strcmp(argv[i], "-warmup_time") == 0){
+ tWarmupTime = atoi(argv[i+1]);
+ } else if (strcmp(argv[i], "-execution_time") == 0){
+ tExecutionTime = atoi(argv[i+1]);
+ } else if (strcmp(argv[i], "-cooldown_time") == 0){
+ tCooldownTime = atoi(argv[i+1]);
+ } else if (strcmp(argv[i], "-table") == 0){
+ tStdTableNum = atoi(argv[i+1]);
+ theStdTableNameFlag = 1;
} else {
return -1;
}
@@ -1131,7 +1354,6 @@ readArguments(int argc, const char** arg
static
void
input_error(){
-
ndbout_c("FLEXASYNCH");
ndbout_c(" Perform benchmark of insert, update and delete transactions");
ndbout_c(" ");
@@ -1156,7 +1378,18 @@ input_error(){
ndbout_c(" -force Force send when communicating");
ndbout_c(" -non_adaptive Send at a 10 millisecond interval");
ndbout_c(" -local Number of part, only use keys in one part out of 16");
- ndbout_c(" -ndbrecord");
+ ndbout_c(" -ndbrecord Use NDB Record");
+ ndbout_c(" -r Number of extra loops");
+ ndbout_c(" -insert Only run inserts on standard table");
+ ndbout_c(" -read Only run reads on standard table");
+ ndbout_c(" -update Only run updates on standard table");
+ ndbout_c(" -delete Only run deletes on standard table");
+ ndbout_c(" -create_table Only run Create Table of standard table");
+ ndbout_c(" -drop_table Only run Drop Table on standard table");
+ ndbout_c(" -warmup_time Warmup Time before measurement starts");
+ ndbout_c(" -execution_time Execution Time where measurement is done");
+ ndbout_c(" -cooldown_time Cooldown time after measurement completed");
+ ndbout_c(" -table Number of standard table, default 0");
}
template class Vector<NdbDictionary::RecordSpecification>;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4330 to 4331) | Jonas Oreland | 11 Nov |