3660 Mikael Ronstrom 2011-12-20 [merge]
merge
=== modified file '.bzrignore'
--- a/.bzrignore revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/.bzrignore revid:mikael.ronstrom@stripped
@@ -3046,7 +3046,7 @@ storage/ndb/**/target
storage/ndb/**/*.class
storage/ndb/src/ndbjtie/**/*.sh
storage/ndb/src/ndbjtie/**/*.log
-storage/ndb/src/ndbjtie/jtie/test/myapi/myapi_test
+storage/ndb/src/ndbjtie/**/*_test
storage/ndb/clusterj/**/*MANIFEST.MF
storage/ndb/clusterj/**/*manifest.mf
storage/ndb/test/crund/*.cnf
=== modified file 'mysql-test/CMakeLists.txt'
--- a/mysql-test/CMakeLists.txt revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/mysql-test/CMakeLists.txt revid:mikael.ronstrom@stripped
@@ -13,13 +13,15 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+# MCP_BUG13511612 CHANGE PATTERN var/ to PATTERN var/*
IF(INSTALL_MYSQLTESTDIR)
INSTALL(
DIRECTORY .
DESTINATION ${INSTALL_MYSQLTESTDIR}
USE_SOURCE_PERMISSIONS
COMPONENT Test
- PATTERN "var/" EXCLUDE
+ PATTERN "var/*" EXCLUDE
PATTERN "lib/My/SafeProcess" EXCLUDE
PATTERN "lib/t*" EXCLUDE
PATTERN "CPack" EXCLUDE
=== modified file 'mysql-test/extra/rpl_tests/rpl_insert_ignore.test'
--- a/mysql-test/extra/rpl_tests/rpl_insert_ignore.test revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/mysql-test/extra/rpl_tests/rpl_insert_ignore.test revid:mikael.ronstrom@stripped
@@ -58,6 +58,11 @@ INSERT IGNORE INTO t1 SELECT NULL, t2.b
--let $assert_text= Count of elements in t1 should be 6.
--source include/assert.inc
+# MCP
+# force outstanding executed transactions to be flushed to binlog for ndb
+--save_master_pos
+# MCP
+
if (`SELECT @@BINLOG_FORMAT != 'STATEMENT'`)
{
--let $binlog_position_cmp= =
@@ -91,6 +96,11 @@ eval CREATE TABLE t2 (
INSERT INTO t1 VALUES (1);
INSERT INTO t2 VALUES (1);
+# MCP
+# force outstanding executed transactions to be flushed to binlog for ndb
+--save_master_pos
+# MCP
+
--let $binlog_file= query_get_value("SHOW MASTER STATUS", File, 1)
--let $binlog_start= query_get_value("SHOW MASTER STATUS", Position, 1)
--let $statement_file=INSERT INTO t1 SELECT t2.a FROM t2 ORDER BY t2.a ON DUPLICATE KEY UPDATE t1.a= t1.a
@@ -101,6 +111,11 @@ INSERT INTO t2 VALUES (1);
--let $assert_text= Sum of elements in t1 should be 1.
--source include/assert.inc
+# MCP
+# force outstanding executed transactions to be flushed to binlog for ndb
+--save_master_pos
+# MCP
+
if (`SELECT @@BINLOG_FORMAT != 'STATEMENT'`)
{
--let $binlog_position_cmp= =
=== modified file 'mysql-test/mysql-test-run.pl'
--- a/mysql-test/mysql-test-run.pl revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/mysql-test/mysql-test-run.pl revid:mikael.ronstrom@stripped
@@ -3130,11 +3130,14 @@ sub memcached_start {
sub memcached_load_metadata($) {
- my $cluster = shift;
+ my $cluster= shift;
- my $sql_script= my_find_file($basedir,
- ["share", "storage/ndb/memcache/scripts"],
- "ndb_memcache_metadata.sql", NOT_REQUIRED);
+ my $sql_script= my_find_file($bindir,
+ ["share/mysql/memcache-api", # RPM install
+ "share/memcache-api", # Other installs
+ "scripts" # Build tree
+ ],
+ "ndb_memcache_metadata.sql", NOT_REQUIRED);
foreach my $mysqld (mysqlds()) {
if(-d $mysqld->value('datadir') . "/" . "ndbmemcache") {
=== modified file 'mysql-test/suite/ndb/r/ndb_alter_table_online2.result'
--- a/mysql-test/suite/ndb/r/ndb_alter_table_online2.result revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/mysql-test/suite/ndb/r/ndb_alter_table_online2.result revid:mikael.ronstrom@stripped
@@ -49,7 +49,7 @@ name
~ Starting mysqlslap using column b
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-update t1 set b= 0;
+update t1 set b = 0 where pk = 1;
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~ Alter table t1 add column c
@@ -74,10 +74,10 @@ name
~ Starting mysqlslap using column c
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-update t1 set c= 0;
+update t1 set c = 0 where pk = 1;
select * from t1;
pk a b c
-1 5000 5000 5000
+1 2000 2000 2000
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~ Alter table t1 and try to add partitions
@@ -105,7 +105,7 @@ name
START TRANSACTION;
SELECT * FROM t1;
pk a b c
-1 5000 5000 5000
+1 2000 2000 2000
# Connection con1
SET lock_wait_timeout=1;
ALTER ONLINE TABLE t1 ADD d INT;
@@ -114,7 +114,7 @@ ALTER ONLINE TABLE t1 ADD d INT;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
SELECT * FROM t1;
pk a b c
-1 5000 5000 5000
+1 2000 2000 2000
COMMIT;
ndb_show_tables completed.....
=== modified file 'mysql-test/suite/ndb/t/ndb_alter_table_online2.test'
--- a/mysql-test/suite/ndb/t/ndb_alter_table_online2.test revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/mysql-test/suite/ndb/t/ndb_alter_table_online2.test revid:mikael.ronstrom@stripped
@@ -60,8 +60,8 @@ set @t1_id = (select id from ndb_show_ta
--echo ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
--echo
-let $end_mysqlslap= 5000;
---exec $MYSQL_SLAP --query="update test.t1 set a=a+1 where pk=1" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
+let $end_mysqlslap= 2000;
+--exec $MYSQL_SLAP --query="update test.t1 set a=a+1 where pk=1; select sleep(0.01);" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
# wait for 100 updates
--disable_result_log
@@ -101,8 +101,8 @@ select name from ndb_show_tables_results
--echo ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
--echo
-update t1 set b= 0;
---exec $MYSQL_SLAP --query="update test.t1 set b=b+1 where pk=1" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
+update t1 set b = 0 where pk = 1;
+--exec $MYSQL_SLAP --query="update test.t1 set b=b+1 where pk=1; select sleep(0.01);" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
# wait for 100 updates
--disable_result_log
@@ -142,8 +142,8 @@ select name from ndb_show_tables_results
--echo ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
--echo
-update t1 set c= 0;
---exec $MYSQL_SLAP --query="update test.t1 set c=c+1 where pk=1" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
+update t1 set c = 0 where pk = 1;
+--exec $MYSQL_SLAP --query="update test.t1 set c=c+1 where pk=1; select sleep(0.01);" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
# wait for mysqlslap to end
--disable_result_log
=== modified file 'sql/ha_ndb_index_stat.cc'
--- a/sql/ha_ndb_index_stat.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/sql/ha_ndb_index_stat.cc revid:mikael.ronstrom@stripped
@@ -50,7 +50,6 @@ Ndb_index_stat_thread::Ndb_index_stat_th
pthread_mutex_init(&LOCK, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND, NULL);
pthread_cond_init(&COND_ready, NULL);
- pthread_mutex_init(&list_mutex, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&stat_mutex, MY_MUTEX_INIT_FAST);
pthread_cond_init(&stat_cond, NULL);
}
@@ -60,7 +59,6 @@ Ndb_index_stat_thread::~Ndb_index_stat_t
pthread_mutex_destroy(&LOCK);
pthread_cond_destroy(&COND);
pthread_cond_destroy(&COND_ready);
- pthread_mutex_destroy(&list_mutex);
pthread_mutex_destroy(&stat_mutex);
pthread_cond_destroy(&stat_cond);
}
@@ -90,9 +88,12 @@ struct Ndb_index_stat {
time_t check_time; /* when checked for updated stats (>= read_time) */
uint query_bytes; /* cache query bytes in use */
uint clean_bytes; /* cache clean bytes waiting to be deleted */
+ uint drop_bytes; /* cache bytes waiting for drop */
+ uint evict_bytes; /* cache bytes waiting for evict */
bool force_update; /* one-time force update from analyze table */
bool no_stats; /* have detected that no stats exist */
NdbIndexStat::Error error;
+ NdbIndexStat::Error client_error;
time_t error_time;
uint error_count;
struct Ndb_index_stat *share_next; /* per-share list */
@@ -101,7 +102,9 @@ struct Ndb_index_stat {
struct Ndb_index_stat *list_next;
struct Ndb_index_stat *list_prev;
struct NDB_SHARE *share;
+ uint ref_count; /* from client requests */
bool to_delete; /* detached from share and marked for delete */
+ bool abort_request; /* abort all requests and allow no more */
Ndb_index_stat();
};
@@ -556,12 +559,16 @@ struct Ndb_index_stat_glob {
uint event_ok; /* Events received for known index */
uint event_miss; /* Events received for unknown index */
uint refresh_count; /* Successful cache refreshes */
+ uint clean_count; /* Times old caches (1 or more) cleaned */
+ uint pinned_count; /* Times not cleaned due to old cache ref count */
uint drop_count; /* From index drop */
uint evict_count; /* From LRU cleanup */
/* Cache */
uint cache_query_bytes; /* In use */
uint cache_clean_bytes; /* Obsolete versions not yet removed */
uint cache_high_bytes; /* Max ever of above */
+ uint cache_drop_bytes; /* Part of above waiting to be evicted */
+ uint cache_evict_bytes; /* Part of above waiting to be evicted */
char status[2][512];
uint status_i;
@@ -588,11 +595,15 @@ Ndb_index_stat_glob::Ndb_index_stat_glob
event_ok= 0;
event_miss= 0;
refresh_count= 0;
+ clean_count= 0;
+ pinned_count= 0;
drop_count= 0;
evict_count= 0;
cache_query_bytes= 0;
cache_clean_bytes= 0;
cache_high_bytes= 0;
+ cache_drop_bytes= 0;
+ cache_evict_bytes= 0;
memset(status, 0, sizeof(status));
status_i= 0;
}
@@ -601,6 +612,8 @@ Ndb_index_stat_glob::Ndb_index_stat_glob
void
Ndb_index_stat_glob::set_status()
{
+ safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
char* p= status[status_i];
@@ -635,11 +648,13 @@ Ndb_index_stat_glob::set_status()
p+= strlen(p);
sprintf(p, "analyze:(all:%u,error:%u)", analyze_count, analyze_error);
p+= strlen(p);
- sprintf(p, ",query:(all:%u,nostats:%u,error:%u)", query_count, query_no_stats, query_error);
+ sprintf(p, ",query:(all:%u,nostats:%u,error:%u)",
+ query_count, query_no_stats, query_error);
p+= strlen(p);
sprintf(p, ",event:(ok:%u,miss:%u)", event_ok, event_miss);
p+= strlen(p);
- sprintf(p, ",cache:(refresh:%u,drop:%u,evict:%u)", refresh_count, drop_count, evict_count);
+ sprintf(p, ",cache:(refresh:%u,clean:%u,pinned:%u,drop:%u,evict:%u)",
+ refresh_count, clean_count, pinned_count, drop_count, evict_count);
p+= strlen(p);
sprintf(p, ")");
p+= strlen(p);
@@ -654,8 +669,12 @@ Ndb_index_stat_glob::set_status()
cache_pct= (double)100.0 * (double)cache_total / (double)cache_limit;
cache_high_pct= (double)100.0 * (double)cache_high_bytes / (double)cache_limit;
}
- sprintf(p, ",cache:(query:%u,clean:%u,usedpct:%.2f,highpct:%.2f)",
- cache_query_bytes, cache_clean_bytes, cache_pct, cache_high_pct);
+ sprintf(p, ",cache:(query:%u,clean:%u"
+ ",drop:%u,evict:%u"
+ ",usedpct:%.2f,highpct:%.2f)",
+ cache_query_bytes, cache_clean_bytes,
+ cache_drop_bytes, cache_evict_bytes,
+ cache_pct, cache_high_pct);
p+= strlen(p);
// alternating status buffers to keep this lock short
@@ -679,6 +698,8 @@ Ndb_index_stat_glob::zero_total()
event_ok= 0;
event_miss= 0;
refresh_count= 0;
+ clean_count= 0;
+ pinned_count= 0;
drop_count= 0;
evict_count= 0;
/* Reset highest use seen to current */
@@ -704,6 +725,8 @@ Ndb_index_stat::Ndb_index_stat()
check_time= 0;
query_bytes= 0;
clean_bytes= 0;
+ drop_bytes= 0;
+ evict_bytes= 0;
force_update= false;
no_stats= false;
error_time= 0;
@@ -714,24 +737,40 @@ Ndb_index_stat::Ndb_index_stat()
list_next= 0;
list_prev= 0;
share= 0;
+ ref_count= 0;
to_delete= false;
+ abort_request= false;
}
+/*
+ Called by stats thread and (rarely) by client. Caller must hold
+ stat_mutex. Client errors currently have no effect on execution
+ since they are probably local e.g. bad range (internal error).
+ Argument "from" is 0=stats thread 1=client.
+*/
void
-ndb_index_stat_error(Ndb_index_stat *st, const char* place, int line)
+ndb_index_stat_error(Ndb_index_stat *st,
+ int from, const char* place, int line)
{
+ safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
time_t now= ndb_index_stat_time();
NdbIndexStat::Error error= st->is->getNdbError();
if (error.code == 0)
{
- // XXX why this if
+ /* Make sure code is not 0 */
NdbIndexStat::Error error2;
error= error2;
error.code= NdbIndexStat::InternalError;
error.status= NdbError::TemporaryError;
}
- st->error= error;
- st->error_time= now;
+ if (from == 0)
+ {
+ st->error= error;
+ st->error_time= now; /* Controls proc_error */
+ }
+ else
+ st->client_error= error;
st->error_count++;
DBUG_PRINT("index_stat", ("%s line %d: error %d line %d extra %d",
@@ -839,6 +878,8 @@ ndb_index_stat_list_move(Ndb_index_stat
void
ndb_index_stat_force_update(Ndb_index_stat *st, bool onoff)
{
+ safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
if (onoff)
{
@@ -864,6 +905,8 @@ ndb_index_stat_force_update(Ndb_index_st
void
ndb_index_stat_no_stats(Ndb_index_stat *st, bool flag)
{
+ safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
if (st->no_stats != flag)
{
@@ -882,8 +925,36 @@ ndb_index_stat_no_stats(Ndb_index_stat *
}
}
+void
+ndb_index_stat_ref_count(Ndb_index_stat *st, bool flag)
+{
+ safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
+ uint old_count= st->ref_count;
+ (void)old_count; // USED
+ if (flag)
+ {
+ st->ref_count++;
+ }
+ else
+ {
+ assert(st->ref_count != 0);
+ st->ref_count--;
+ }
+ DBUG_PRINT("index_stat", ("st %s ref_count:%u->%u",
+ st->id, old_count, st->ref_count));
+}
+
/* Find or add entry under the share */
+/* Saved in get_share() under stat_mutex */
+struct Ndb_index_stat_snap {
+ time_t load_time;
+ uint sample_version;
+ Ndb_index_stat_snap() { load_time= 0; sample_version= 0; }
+};
+
+/* Subroutine, have lock */
Ndb_index_stat*
ndb_index_stat_alloc(const NDBINDEX *index,
const NDBTAB *table,
@@ -902,8 +973,8 @@ ndb_index_stat_alloc(const NDBINDEX *ind
#endif
if (is->set_index(*index, *table) == 0)
return st;
- ndb_index_stat_error(st, "set_index", __LINE__);
- err_out= st->error.code;
+ ndb_index_stat_error(st, 1, "set_index", __LINE__);
+ err_out= st->client_error.code;
}
else
{
@@ -956,6 +1027,7 @@ Ndb_index_stat*
ndb_index_stat_get_share(NDB_SHARE *share,
const NDBINDEX *index,
const NDBTAB *table,
+ Ndb_index_stat_snap &snap,
int &err_out,
bool allow_add,
bool force_update)
@@ -963,7 +1035,6 @@ ndb_index_stat_get_share(NDB_SHARE *shar
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
pthread_mutex_lock(&share->mutex);
- pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
time_t now= ndb_index_stat_time();
err_out= 0;
@@ -974,7 +1045,7 @@ ndb_index_stat_get_share(NDB_SHARE *shar
{
if (unlikely(!ndb_index_stat_allow()))
{
- err_out= Ndb_index_stat_error_NOT_ALLOW;
+ err_out= NdbIndexStat::MyNotAllow;
break;
}
st= ndb_index_stat_find_share(share, index, st_last);
@@ -982,7 +1053,7 @@ ndb_index_stat_get_share(NDB_SHARE *shar
{
if (!allow_add)
{
- err_out= Ndb_index_stat_error_NOT_FOUND;
+ err_out= NdbIndexStat::MyNotFound;
break;
}
st= ndb_index_stat_alloc(index, table, err_out);
@@ -995,14 +1066,28 @@ ndb_index_stat_get_share(NDB_SHARE *shar
ndb_index_stat_list_add(st, Ndb_index_stat::LT_New);
glob.set_status();
}
+ else if (unlikely(st->abort_request))
+ {
+ err_out= NdbIndexStat::MyAbortReq;
+ break;
+ }
if (force_update)
ndb_index_stat_force_update(st, true);
+ snap.load_time= st->load_time;
+ snap.sample_version= st->sample_version;
st->access_time= now;
}
while (0);
+
+ if (err_out == 0)
+ {
+ assert(st != 0);
+ ndb_index_stat_ref_count(st, true);
+ }
+ else
+ st= 0;
pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
- pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
pthread_mutex_unlock(&share->mutex);
return st;
}
@@ -1012,10 +1097,12 @@ ndb_index_stat_get_share(NDB_SHARE *shar
list and set "to_delete" flag. Stats thread does real delete.
*/
-/* caller must hold list_mutex */
+/* caller must hold stat_mutex */
void
ndb_index_stat_free(Ndb_index_stat *st)
{
+ safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
DBUG_ENTER("ndb_index_stat_free");
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
NDB_SHARE *share= st->share;
@@ -1037,6 +1124,7 @@ ndb_index_stat_free(Ndb_index_stat *st)
assert(st->lt != Ndb_index_stat::LT_Delete);
assert(!st->to_delete);
st->to_delete= true;
+ st->abort_request= true;
found++;
}
else
@@ -1053,9 +1141,7 @@ ndb_index_stat_free(Ndb_index_stat *st)
assert(found == 1);
share->index_stat_list= st_head;
- pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
DBUG_VOID_RETURN;
}
@@ -1067,7 +1153,7 @@ ndb_index_stat_free(NDB_SHARE *share, in
DBUG_PRINT("index_stat", ("(index_id:%d index_version:%d",
index_id, index_version));
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
- pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
uint found= 0;
Ndb_index_stat *st= share->index_stat_list;
@@ -1078,16 +1164,17 @@ ndb_index_stat_free(NDB_SHARE *share, in
{
ndb_index_stat_free(st);
found++;
+ glob.drop_count++;
+ assert(st->drop_bytes == 0);
+ st->drop_bytes= st->query_bytes + st->clean_bytes;
+ glob.cache_drop_bytes+= st->drop_bytes;
break;
}
st= st->share_next;
}
- pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
- glob.drop_count+= found;
glob.set_status();
pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
- pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
DBUG_VOID_RETURN;
}
@@ -1096,7 +1183,8 @@ ndb_index_stat_free(NDB_SHARE *share)
{
DBUG_ENTER("ndb_index_stat_free");
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
- pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+
uint found= 0;
Ndb_index_stat *st;
while ((st= share->index_stat_list) != 0)
@@ -1109,13 +1197,16 @@ ndb_index_stat_free(NDB_SHARE *share)
assert(st->lt != Ndb_index_stat::LT_Delete);
assert(!st->to_delete);
st->to_delete= true;
+ st->abort_request= true;
found++;
+ glob.drop_count++;
+ assert(st->drop_bytes == 0);
+ st->drop_bytes+= st->query_bytes + st->clean_bytes;
+ glob.cache_drop_bytes+= st->drop_bytes;
}
- pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
- glob.drop_count+= found;
+
glob.set_status();
pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
- pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
DBUG_VOID_RETURN;
}
@@ -1126,7 +1217,7 @@ ndb_index_stat_find_entry(int index_id,
{
DBUG_ENTER("ndb_index_stat_find_entry");
pthread_mutex_lock(&ndbcluster_mutex);
- pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
DBUG_PRINT("index_stat", ("find index:%d version:%d table:%d",
index_id, index_version, table_id));
@@ -1139,7 +1230,7 @@ ndb_index_stat_find_entry(int index_id,
if (st->index_id == index_id &&
st->index_version == index_version)
{
- pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(st);
}
@@ -1147,7 +1238,7 @@ ndb_index_stat_find_entry(int index_id,
}
}
- pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
@@ -1179,7 +1270,7 @@ ndb_index_stat_cache_move(Ndb_index_stat
glob.cache_high_bytes= cache_total;
}
-void
+bool
ndb_index_stat_cache_clean(Ndb_index_stat *st)
{
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
@@ -1187,12 +1278,46 @@ ndb_index_stat_cache_clean(Ndb_index_sta
st->is->get_cache_info(infoClean, NdbIndexStat::CacheClean);
const uint old_clean_bytes= infoClean.m_totalBytes;
- DBUG_PRINT("index_stat", ("st %s cache clean: clean:%u",
- st->id, old_clean_bytes));
+ const uint ref_count= infoClean.m_ref_count;
+ DBUG_PRINT("index_stat", ("st %s cache clean: clean:%u ref_count:%u",
+ st->id, old_clean_bytes, ref_count));
+ if (ref_count != 0)
+ return false;
st->is->clean_cache();
st->clean_bytes= 0;
assert(glob.cache_clean_bytes >= old_clean_bytes);
glob.cache_clean_bytes-= old_clean_bytes;
+ return true;
+}
+
+void
+ndb_index_stat_cache_evict(Ndb_index_stat *st)
+{
+ NdbIndexStat::Head head;
+ NdbIndexStat::CacheInfo infoBuild;
+ NdbIndexStat::CacheInfo infoQuery;
+ NdbIndexStat::CacheInfo infoClean;
+ st->is->get_head(head);
+ st->is->get_cache_info(infoBuild, NdbIndexStat::CacheBuild);
+ st->is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
+ st->is->get_cache_info(infoClean, NdbIndexStat::CacheClean);
+
+ DBUG_PRINT("index_stat",
+ ("evict table: %u index: %u version: %u"
+ " sample version: %u"
+ " cache bytes build:%u query:%u clean:%u",
+ head.m_tableId, head.m_indexId, head.m_indexVersion,
+ head.m_sampleVersion,
+ infoBuild.m_totalBytes, infoQuery.m_totalBytes, infoClean.m_totalBytes));
+
+ /* Twice to move all caches to clean */
+ ndb_index_stat_cache_move(st);
+ ndb_index_stat_cache_move(st);
+ /* Unused variable release vs debug nonsense */
+ bool ok= false;
+ (void)ok; // USED
+ ok= ndb_index_stat_cache_clean(st);
+ assert(ok);
}
/* Misc in/out parameters for process steps */
@@ -1231,7 +1356,7 @@ void
ndb_index_stat_proc_new(Ndb_index_stat_proc &pr)
{
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
- pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
const int lt= Ndb_index_stat::LT_New;
Ndb_index_stat_list &list= ndb_index_stat_list[lt];
@@ -1245,10 +1370,8 @@ ndb_index_stat_proc_new(Ndb_index_stat_p
assert(pr.lt != lt);
ndb_index_stat_list_move(st, pr.lt);
}
- pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
- pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
}
void
@@ -1256,8 +1379,8 @@ ndb_index_stat_proc_update(Ndb_index_sta
{
if (st->is->update_stat(pr.ndb) == -1)
{
- ndb_index_stat_error(st, "update_stat", __LINE__);
pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+ ndb_index_stat_error(st, 0, "update_stat", __LINE__);
/*
Turn off force update or else proc_error() thinks
@@ -1311,7 +1434,7 @@ ndb_index_stat_proc_read(Ndb_index_stat_
if (st->is->read_stat(pr.ndb) == -1)
{
pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
- ndb_index_stat_error(st, "read_stat", __LINE__);
+ ndb_index_stat_error(st, 0, "read_stat", __LINE__);
const bool force_update= st->force_update;
ndb_index_stat_force_update(st, false);
@@ -1384,6 +1507,7 @@ ndb_index_stat_proc_read(Ndb_index_stat_
void
ndb_index_stat_proc_idle(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
{
+ Ndb_index_stat_glob &glob= ndb_index_stat_glob;
const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
const longlong clean_delay= opt.get(Ndb_index_stat_opt::Iclean_delay);
const longlong check_delay= opt.get(Ndb_index_stat_opt::Icheck_delay);
@@ -1408,7 +1532,10 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
if (st->clean_bytes != 0 && clean_wait <= 0)
{
- ndb_index_stat_cache_clean(st);
+ if (ndb_index_stat_cache_clean(st))
+ glob.clean_count++;
+ else
+ glob.pinned_count++;
}
if (st->force_update)
{
@@ -1482,7 +1609,8 @@ ndb_index_stat_proc_check(Ndb_index_stat
NdbIndexStat::Head head;
if (st->is->read_head(pr.ndb) == -1)
{
- ndb_index_stat_error(st, "read_head", __LINE__);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+ ndb_index_stat_error(st, 0, "read_head", __LINE__);
/* no stats is not unexpected error */
if (st->is->getNdbError().code == NdbIndexStat::NoIndexStats)
{
@@ -1493,6 +1621,8 @@ ndb_index_stat_proc_check(Ndb_index_stat
{
pr.lt= Ndb_index_stat::LT_Error;
}
+ pthread_cond_broadcast(&ndb_index_stat_thread.stat_cond);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
return;
}
st->is->get_head(head);
@@ -1537,39 +1667,6 @@ ndb_index_stat_proc_check(Ndb_index_stat
pr.busy= true;
}
-/* Only evict the caches */
-void
-ndb_index_stat_proc_evict(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
-{
- Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-
- NdbIndexStat::Head head;
- NdbIndexStat::CacheInfo infoBuild;
- NdbIndexStat::CacheInfo infoQuery;
- NdbIndexStat::CacheInfo infoClean;
- st->is->get_head(head);
- st->is->get_cache_info(infoBuild, NdbIndexStat::CacheBuild);
- st->is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
- st->is->get_cache_info(infoClean, NdbIndexStat::CacheClean);
-
- DBUG_PRINT("index_stat",
- ("evict table: %u index: %u version: %u"
- " sample version: %u"
- " cache bytes build:%u query:%u clean:%u",
- head.m_tableId, head.m_indexId, head.m_indexVersion,
- head.m_sampleVersion,
- infoBuild.m_totalBytes, infoQuery.m_totalBytes, infoClean.m_totalBytes));
-
- /* Twice to move all caches to clean */
- ndb_index_stat_cache_move(st);
- ndb_index_stat_cache_move(st);
- ndb_index_stat_cache_clean(st);
-
- pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
- glob.set_status();
- pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
-}
-
/* Check if need to evict more */
bool
ndb_index_stat_proc_evict()
@@ -1577,6 +1674,11 @@ ndb_index_stat_proc_evict()
const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
uint curr_size= glob.cache_query_bytes + glob.cache_clean_bytes;
+
+ /* Subtract bytes already scheduled for evict */
+ assert(curr_size >= glob.cache_evict_bytes);
+ curr_size-= glob.cache_evict_bytes;
+
const uint cache_lowpct= opt.get(Ndb_index_stat_opt::Icache_lowpct);
const uint cache_limit= opt.get(Ndb_index_stat_opt::Icache_limit);
if (100 * curr_size <= cache_lowpct * cache_limit)
@@ -1612,6 +1714,9 @@ ndb_index_stat_proc_evict(Ndb_index_stat
if (!ndb_index_stat_proc_evict())
return;
+ /* Mutex entire routine (protect access_time) */
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+
/* Create a LRU batch */
Ndb_index_stat* st_lru_arr[ndb_index_stat_max_evict_batch + 1];
uint st_lru_cnt= 0;
@@ -1690,19 +1795,19 @@ ndb_index_stat_proc_evict(Ndb_index_stat
Ndb_index_stat *st= st_lru_arr[cnt];
DBUG_PRINT("index_stat", ("st %s proc evict %s", st->id, list.name));
- ndb_index_stat_proc_evict(pr, st);
- pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+
+ /* Entry may have requests. Cache is evicted at delete. */
ndb_index_stat_free(st);
- pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+ assert(st->evict_bytes == 0);
+ st->evict_bytes= st->query_bytes + st->clean_bytes;
+ glob.cache_evict_bytes+= st->evict_bytes;
cnt++;
}
+ if (cnt == batch)
+ pr.busy= true;
- pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.evict_count+= cnt;
pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
-
- if (cnt == batch)
- pr.busy= true;
}
void
@@ -1722,6 +1827,9 @@ ndb_index_stat_proc_delete(Ndb_index_sta
const uint delete_batch= opt.get(Ndb_index_stat_opt::Idelete_batch);
const uint batch= !pr.end ? delete_batch : ~(uint)0;
+ /* Mutex entire routine */
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+
Ndb_index_stat *st_loop= list.head;
uint cnt= 0;
while (st_loop != 0 && cnt < batch)
@@ -1731,12 +1839,26 @@ ndb_index_stat_proc_delete(Ndb_index_sta
DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
// adjust global counters at drop
- pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
ndb_index_stat_force_update(st, false);
ndb_index_stat_no_stats(st, false);
- pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
- ndb_index_stat_proc_evict(pr, st);
+ /*
+ Do not wait for requests to terminate since this could
+ risk stats thread hanging. Instead try again next time.
+ Presumably clients will eventually notice abort_request.
+ */
+ if (st->ref_count != 0)
+ {
+ DBUG_PRINT("index_stat", ("st %s proc %s: ref_count:%u",
+ st->id, list.name, st->ref_count));
+ continue;
+ }
+
+ ndb_index_stat_cache_evict(st);
+ assert(glob.cache_drop_bytes >= st->drop_bytes);
+ glob.cache_drop_bytes-= st->drop_bytes;
+ assert(glob.cache_evict_bytes >= st->evict_bytes);
+ glob.cache_evict_bytes-= st->evict_bytes;
ndb_index_stat_list_remove(st);
delete st->is;
delete st;
@@ -1745,7 +1867,6 @@ ndb_index_stat_proc_delete(Ndb_index_sta
if (cnt == batch)
pr.busy= true;
- pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
glob.set_status();
pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
}
@@ -2004,7 +2125,7 @@ void
ndb_index_stat_list_verify(Ndb_index_stat_proc &pr)
{
const Ndb_index_stat_glob &glob= ndb_index_stat_glob;
- pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
pr.cache_query_bytes= 0;
pr.cache_clean_bytes= 0;
@@ -2013,7 +2134,7 @@ ndb_index_stat_list_verify(Ndb_index_sta
assert(glob.cache_query_bytes == pr.cache_query_bytes);
assert(glob.cache_clean_bytes == pr.cache_clean_bytes);
- pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
}
void
@@ -2498,12 +2619,16 @@ ndb_index_stat_round(double x)
return n;
}
+/*
+ Client waits for query or analyze. The routines are
+ similar but separated for clarity.
+*/
+
int
-ndb_index_stat_wait(Ndb_index_stat *st,
- uint sample_version,
- bool from_analyze)
+ndb_index_stat_wait_query(Ndb_index_stat *st,
+ const Ndb_index_stat_snap &snap)
{
- DBUG_ENTER("ndb_index_stat_wait");
+ DBUG_ENTER("ndb_index_stat_wait_query");
Ndb_index_stat_glob &glob= ndb_index_stat_glob;
pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
@@ -2515,24 +2640,16 @@ ndb_index_stat_wait(Ndb_index_stat *st,
int ret= 0;
if (count == 0)
{
- if (!from_analyze)
- {
- glob.wait_stats++;
- glob.query_count++;
- }
- else
+ glob.wait_stats++;
+ glob.query_count++;
+ if (st->lt == Ndb_index_stat::LT_Error)
{
- glob.wait_update++;
- glob.analyze_count++;
- }
- if (st->lt == Ndb_index_stat::LT_Error && !from_analyze)
- {
- err= Ndb_index_stat_error_HAS_ERROR;
+ err= NdbIndexStat::MyHasError;
break;
}
ndb_index_stat_clear_error(st);
}
- if (st->no_stats && !from_analyze)
+ if (st->no_stats)
{
/* Have detected no stats now or before */
err= NdbIndexStat::NoIndexStats;
@@ -2543,16 +2660,29 @@ ndb_index_stat_wait(Ndb_index_stat *st,
{
/* A new error has occured */
err= st->error.code;
- if (!from_analyze)
- glob.query_error++;
- else
- glob.analyze_error++;
+ glob.query_error++;
+ break;
+ }
+ /* Query waits for any samples */
+ if (st->sample_version > 0)
+ break;
+ /*
+ Try to detect changes behind our backs. Should really not
+ happen but make sure.
+ */
+ if (st->load_time != snap.load_time ||
+ st->sample_version != snap.sample_version)
+ {
+ err= NdbIndexStat::NoIndexStats;
break;
}
- if (st->sample_version > sample_version)
+ if (st->abort_request)
+ {
+ err= NdbIndexStat::MyAbortReq;
break;
+ }
count++;
- DBUG_PRINT("index_stat", ("st %s wait count:%u",
+ DBUG_PRINT("index_stat", ("st %s wait_query count:%u",
st->id, count));
pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
ndb_index_stat_waiter= true;
@@ -2568,25 +2698,93 @@ ndb_index_stat_wait(Ndb_index_stat *st,
break;
}
}
- if (!from_analyze)
+ assert(glob.wait_stats != 0);
+ glob.wait_stats--;
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+ if (err != 0)
{
- assert(glob.wait_stats != 0);
- glob.wait_stats--;
+ DBUG_PRINT("index_stat", ("st %s wait_query error: %d",
+ st->id, err));
+ DBUG_RETURN(err);
}
- else
+ DBUG_PRINT("index_stat", ("st %s wait_query ok: sample_version %u -> %u",
+ st->id, snap.sample_version, st->sample_version));
+ DBUG_RETURN(0);
+}
+
+int
+ndb_index_stat_wait_analyze(Ndb_index_stat *st,
+ const Ndb_index_stat_snap &snap)
+{
+ DBUG_ENTER("ndb_index_stat_wait_analyze");
+
+ Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+ int err= 0;
+ uint count= 0;
+ struct timespec abstime;
+ while (true)
{
- assert(glob.wait_update != 0);
- glob.wait_update--;
+ int ret= 0;
+ if (count == 0)
+ {
+ glob.wait_update++;
+ glob.analyze_count++;
+ ndb_index_stat_clear_error(st);
+ }
+ if (st->error.code != 0)
+ {
+ /* A new error has occured */
+ err= st->error.code;
+ glob.analyze_error++;
+ break;
+ }
+ /* Analyze waits for newer samples */
+ if (st->sample_version > snap.sample_version)
+ break;
+ /*
+ Try to detect changes behind our backs. If another process
+ deleted stats, an analyze here could wait forever.
+ */
+ if (st->load_time != snap.load_time ||
+ st->sample_version != snap.sample_version)
+ {
+ err= NdbIndexStat::AlienUpdate;
+ break;
+ }
+ if (st->abort_request)
+ {
+ err= NdbIndexStat::MyAbortReq;
+ break;
+ }
+ count++;
+ DBUG_PRINT("index_stat", ("st %s wait_analyze count:%u",
+ st->id, count));
+ pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
+ ndb_index_stat_waiter= true;
+ pthread_cond_signal(&ndb_index_stat_thread.COND);
+ pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
+ set_timespec(abstime, 1);
+ ret= pthread_cond_timedwait(&ndb_index_stat_thread.stat_cond,
+ &ndb_index_stat_thread.stat_mutex,
+ &abstime);
+ if (ret != 0 && ret != ETIMEDOUT)
+ {
+ err= ret;
+ break;
+ }
}
+ assert(glob.wait_update != 0);
+ glob.wait_update--;
pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
if (err != 0)
{
- DBUG_PRINT("index_stat", ("st %s wait error: %d",
+ DBUG_PRINT("index_stat", ("st %s wait_analyze error: %d",
st->id, err));
DBUG_RETURN(err);
}
- DBUG_PRINT("index_stat", ("st %s wait ok: sample_version %u -> %u",
- st->id, sample_version, st->sample_version));
+ DBUG_PRINT("index_stat", ("st %s wait_analyze ok: sample_version %u -> %u",
+ st->id, snap.sample_version, st->sample_version));
DBUG_RETURN(0);
}
@@ -2611,37 +2809,51 @@ ha_ndbcluster::ndb_index_stat_query(uint
compute_index_bounds(ib, key_info, min_key, max_key, from);
ib.range_no= 0;
+ Ndb_index_stat_snap snap;
Ndb_index_stat *st=
- ndb_index_stat_get_share(m_share, index, m_table, err, true, false);
+ ndb_index_stat_get_share(m_share, index, m_table, snap, err, true, false);
if (st == 0)
DBUG_RETURN(err);
+ /* Now holding reference to st */
- /* Pass old version 0 so existing stats terminates wait at once */
- err= ndb_index_stat_wait(st, 0, false);
- if (err != 0)
- DBUG_RETURN(err);
- assert(st->sample_version != 0);
-
- uint8 bound_lo_buffer[NdbIndexStat::BoundBufferBytes];
- uint8 bound_hi_buffer[NdbIndexStat::BoundBufferBytes];
- NdbIndexStat::Bound bound_lo(st->is, bound_lo_buffer);
- NdbIndexStat::Bound bound_hi(st->is, bound_hi_buffer);
- NdbIndexStat::Range range(bound_lo, bound_hi);
-
- const NdbRecord* key_record= data.ndb_record_key;
- if (st->is->convert_range(range, key_record, &ib) == -1)
- {
- ndb_index_stat_error(st, "convert_range", __LINE__);
- DBUG_RETURN(st->error.code);
- }
- if (st->is->query_stat(range, stat) == -1)
+ do
{
- /* Invalid cache - should remove the entry */
- ndb_index_stat_error(st, "query_stat", __LINE__);
- DBUG_RETURN(st->error.code);
+ err= ndb_index_stat_wait_query(st, snap);
+ if (err != 0)
+ break;
+ assert(st->sample_version != 0);
+ uint8 bound_lo_buffer[NdbIndexStat::BoundBufferBytes];
+ uint8 bound_hi_buffer[NdbIndexStat::BoundBufferBytes];
+ NdbIndexStat::Bound bound_lo(st->is, bound_lo_buffer);
+ NdbIndexStat::Bound bound_hi(st->is, bound_hi_buffer);
+ NdbIndexStat::Range range(bound_lo, bound_hi);
+
+ const NdbRecord* key_record= data.ndb_record_key;
+ if (st->is->convert_range(range, key_record, &ib) == -1)
+ {
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+ ndb_index_stat_error(st, 1, "convert_range", __LINE__);
+ err= st->client_error.code;
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+ break;
+ }
+ if (st->is->query_stat(range, stat) == -1)
+ {
+ /* Invalid cache - should remove the entry */
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+ ndb_index_stat_error(st, 1, "query_stat", __LINE__);
+ err= st->client_error.code;
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+ break;
+ }
}
+ while (0);
- DBUG_RETURN(0);
+ /* Release reference to st */
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+ ndb_index_stat_ref_count(st, false);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+ DBUG_RETURN(err);
}
int
@@ -2713,50 +2925,62 @@ ha_ndbcluster::ndb_index_stat_analyze(Nd
{
DBUG_ENTER("ha_ndbcluster::ndb_index_stat_analyze");
- struct {
- uint sample_version;
- uint error_count;
- } old[MAX_INDEXES];
-
- int err= 0;
- uint i;
+ struct Req {
+ Ndb_index_stat *st;
+ Ndb_index_stat_snap snap;
+ int err;
+ Req() { st= 0; err= 0; }
+ };
+ Req req[MAX_INDEXES];
/* Force stats update on each index */
- for (i= 0; i < inx_count; i++)
+ for (uint i= 0; i < inx_count; i++)
{
+ Req &r= req[i];
uint inx= inx_list[i];
const NDB_INDEX_DATA &data= m_index[inx];
const NDBINDEX *index= data.index;
DBUG_PRINT("index_stat", ("force update: %s", index->getName()));
- Ndb_index_stat *st=
- ndb_index_stat_get_share(m_share, index, m_table, err, true, true);
- if (st == 0)
- DBUG_RETURN(err);
-
- old[i].sample_version= st->sample_version;
- old[i].error_count= st->error_count;
+ r.st=
+ ndb_index_stat_get_share(m_share, index, m_table, r.snap, r.err, true, true);
+ assert((r.st != 0) == (r.err == 0));
+ /* Now holding reference to r.st if r.err == 0 */
}
- /* Wait for each update (or error) */
- for (i = 0; i < inx_count; i++)
+ /* Wait for each update */
+ for (uint i = 0; i < inx_count; i++)
{
+ Req &r= req[i];
uint inx= inx_list[i];
const NDB_INDEX_DATA &data= m_index[inx];
const NDBINDEX *index= data.index;
- DBUG_PRINT("index_stat", ("wait for update: %s", index->getName()));
+ (void)index; // USED
- Ndb_index_stat *st=
- ndb_index_stat_get_share(m_share, index, m_table, err, false, false);
- if (st == 0)
- DBUG_RETURN(err);
+ if (r.err == 0)
+ {
+ DBUG_PRINT("index_stat", ("wait for update: %s", index->getName()));
+ r.err=ndb_index_stat_wait_analyze(r.st, r.snap);
+ /* Release reference to r.st */
+ pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+ ndb_index_stat_ref_count(r.st, false);
+ pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+ }
+ }
- err= ndb_index_stat_wait(st, old[i].sample_version, true);
- if (err != 0)
- DBUG_RETURN(err);
+ /* Return first error if any */
+ int err= 0;
+ for (uint i= 0; i < inx_count; i++)
+ {
+ Req &r= req[i];
+ if (r.err != 0)
+ {
+ err= r.err;
+ break;
+ }
}
- DBUG_RETURN(0);
+ DBUG_RETURN(err);
}
#endif
=== modified file 'sql/ha_ndb_index_stat.h'
--- a/sql/ha_ndb_index_stat.h revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/sql/ha_ndb_index_stat.h revid:mikael.ronstrom@stripped
@@ -40,10 +40,10 @@ public:
pthread_cond_t COND;
pthread_cond_t COND_ready;
- /* protect entry lists where needed */
- pthread_mutex_t list_mutex;
-
- /* protect and signal changes in stats entries */
+ /*
+ protect stats entry lists where needed
+ protect and signal changes in stats entries
+ */
pthread_mutex_t stat_mutex;
pthread_cond_t stat_cond;
@@ -82,5 +82,8 @@ compute_index_bounds(NdbIndexScanOperati
/* request on stats entry with recent error was ignored */
#define Ndb_index_stat_error_HAS_ERROR 9003
+
+/* stats thread aborted request on stats entry */
+#define Ndb_index_stat_error_ABORT_REQUEST 9004
#endif
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/sql/ha_ndbcluster.cc revid:mikael.ronstrom@stripped
@@ -1286,7 +1286,9 @@ void ha_ndbcluster::set_rec_per_key()
/* no stats is not unexpected error */
err != NdbIndexStat::NoIndexStats &&
/* warning was printed at first error */
- err != Ndb_index_stat_error_HAS_ERROR)
+ err != NdbIndexStat::MyHasError &&
+ /* stats thread aborted request */
+ err != NdbIndexStat::MyAbortReq)
{
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_CANT_GET_STAT, /* pun? */
@@ -12893,7 +12895,9 @@ ha_ndbcluster::records_in_range(uint inx
/* no stats is not unexpected error */
err != NdbIndexStat::NoIndexStats &&
/* warning was printed at first error */
- err != Ndb_index_stat_error_HAS_ERROR)
+ err != NdbIndexStat::MyHasError &&
+ /* stats thread aborted request */
+ err != NdbIndexStat::MyAbortReq)
{
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_CANT_GET_STAT, /* pun? */
=== modified file 'storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/ClusterJHelper.java'
--- a/storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/ClusterJHelper.java revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/ClusterJHelper.java revid:mikael.ronstrom@stripped
@@ -35,6 +35,14 @@ import java.util.Map;
*/
public class ClusterJHelper {
+ /** Return a new Dbug instance.
+ *
+ * @return a new Dbug instance
+ */
+ public static Dbug newDbug() {
+ return getServiceInstance(Dbug.class);
+ }
+
/** Locate a SessionFactory implementation by services lookup. The class loader
* used is the thread's context class loader.
*
=== added file 'storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/Dbug.java'
--- a/storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/Dbug.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/Dbug.java revid:mikael.ronstrom@stripped
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package com.mysql.clusterj;
+
+/** Dbug allows clusterj applications to enable the DBUG functionality in cluster
+ * ndbapi library.
+ * The dbug state is a control string that consists of flags separated by colons. Flags are:
+ * <ul><li>d set the debug flag
+ * </li><li>a[,filename] append debug output to the file
+ * </li><li>A[,filename] like a[,filename] but flush the output after each operation
+ * </li><li>d[,keyword[,keyword...]] enable output from macros with specified keywords
+ * </li><li>D[,tenths] delay for specified tenths of a second after each operation
+ * </li><li>f[,function[,function...]] limit output to the specified list of functions
+ * </li><li>F mark each output with the file name of the source file
+ * </li><li>i mark each output with the process id of the current process
+ * </li><li>g[,function[,function...]] profile specified list of functions
+ * </li><li>L mark each output with the line number of the source file
+ * </li><li>n mark each output with the current function nesting depth
+ * </li><li>N mark each output with a sequential number
+ * </li><li>o[,filename] overwrite debug output to the file
+ * </li><li>O[,filename] like o[,filename] but flush the output after each operation
+ * </li><li>p[,pid[,pid...]] limit output to specified list of process ids
+ * </li><li>P mark each output with the process name
+ * </li><li>r reset the indentation level to zero
+ * </li><li>t[,depth] limit function nesting to the specified depth
+ * </li><li>T mark each output with the current timestamp
+ * </li></ul>
+ * For example, the control string to trace calls and output debug information only for
+ * "jointx" and overwrite the contents of file "/tmp/dbug/jointx", use "t:d,jointx:o,/tmp/dbug/jointx".
+ * The above can be written as ClusterJHelper.newDbug().trace().debug("jointx").output("/tmp/dbug/jointx").set();
+ */
+public interface Dbug {
+ /** Push the current state and set the parameter as the new state.
+ * @param state the new state
+ */
+ void push(String state);
+
+ /** Pop the current state. The new state will be the previously pushed state.
+ */
+ void pop();
+
+ /** Set the current state from the parameter.
+ * @param state the new state
+ */
+ void set(String state);
+
+ /** Return the current state.
+ * @return the current state
+ */
+ String get();
+
+ /** Print debug message.
+ *
+ */
+ void print(String keyword, String message);
+
+ /** Set the list of debug keywords.
+ * @param strings the debug keywords
+ * @return this
+ */
+ Dbug debug(String[] strings);
+
+ /** Set the list of debug keywords.
+ * @param string the comma separated debug keywords
+ * @return this
+ */
+ Dbug debug(String string);
+
+ /** Push the current state as defined by the methods.
+ */
+ void push();
+
+ /** Set the current state as defined by the methods.
+ */
+ void set();
+
+ /** Specify the file name for debug output (append).
+ * @param fileName the name of the file
+ * @return this
+ */
+ Dbug append(String fileName);
+
+ /** Specify the file name for debug output (overwrite).
+ * @param fileName the name of the file
+ * @return this
+ */
+ Dbug output(String fileName);
+
+ /** Force flush after each output operation.
+ * @return this
+ */
+ Dbug flush();
+
+ /** Set the trace flag.
+ * @return this
+ */
+ Dbug trace();
+
+}
=== added file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java revid:mikael.ronstrom@stripped
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package testsuite.clusterj;
+
+import com.mysql.clusterj.ClusterJHelper;
+import com.mysql.clusterj.Dbug;
+
+/**
+ * Tests dbug methods.
+ */
+public class DbugTest extends AbstractClusterJTest{
+
+ static String tmpFileName = System.getProperty("MYSQL_TMP_DIR", "/tmp") + "/clusterj-test-dbug";
+
+ public boolean getDebug() {
+ return false;
+ }
+
+ public void test() {
+ Dbug dbug = ClusterJHelper.newDbug();
+ if (dbug == null) {
+ // nothing else can be tested
+ fail("Failed to get new Dbug");
+ }
+ if (dbug.get() == null) {
+ // ndbclient is compiled without DBUG; just make sure nothing blows up
+ dbug.set("nothing");
+ dbug.push("nada");
+ dbug.pop();
+ dbug.print("keyword", "message");
+ return;
+ }
+ String originalState = "t";
+ String newState = "d,jointx:o," + tmpFileName;
+ dbug.set(originalState);
+ String actualState = dbug.get();
+ errorIfNotEqual("Failed to set original state", originalState, actualState);
+ dbug.push(newState);
+ actualState = dbug.get();
+ errorIfNotEqual("Failed to push new state", newState, actualState);
+ dbug.pop();
+ actualState = dbug.get();
+ errorIfNotEqual("Failed to pop original state", originalState, actualState);
+
+ dbug = ClusterJHelper.newDbug();
+ dbug.output(tmpFileName).flush().debug(new String[] {"a", "b", "c", "d", "e", "f"}).push();
+ actualState = dbug.get();
+ // keywords are stored LIFO
+ errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:O," + tmpFileName, actualState);
+ dbug.pop();
+
+ dbug = ClusterJHelper.newDbug();
+ dbug.append(tmpFileName).trace().debug("a,b,c,d,e,f").set();
+ actualState = dbug.get();
+ // keywords are stored LIFO
+ errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:a," + tmpFileName + ":t", actualState);
+ dbug.pop();
+
+ failOnError();
+ }
+
+}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java revid:mikael.ronstrom@stripped
@@ -45,13 +45,6 @@ public class ClusterConnectionImpl
static final Logger logger = LoggerFactoryService.getFactory()
.getInstance(com.mysql.clusterj.core.store.ClusterConnection.class);
- /** Load the ndbjtie system library */
- static {
- loadSystemLibrary("ndbclient");
- // initialize the charset map
- Utility.getCharsetMap();
- }
-
/** Ndb_cluster_connection is wrapped by ClusterConnection */
protected Ndb_cluster_connection clusterConnection;
@@ -77,40 +70,6 @@ public class ClusterConnectionImpl
logger.info(local.message("INFO_Create_Cluster_Connection", connectString, nodeId));
}
- static protected void loadSystemLibrary(String name) {
- String message;
- String path;
- try {
- System.loadLibrary(name);
- } catch (UnsatisfiedLinkError e) {
- path = getLoadLibraryPath();
- message = local.message("ERR_Failed_Loading_Library",
- name, path, "UnsatisfiedLinkError", e.getLocalizedMessage());
- logger.fatal(message);
- throw e;
- } catch (SecurityException e) {
- path = getLoadLibraryPath();
- message = local.message("ERR_Failed_Loading_Library",
- name, path, "SecurityException", e.getLocalizedMessage());
- logger.fatal(message);
- throw e;
- }
- }
-
- /**
- * @return the load library path or the Exception string
- */
- private static String getLoadLibraryPath() {
- String path;
- try {
- path = System.getProperty("java.library.path");
- } catch (Exception ex) {
- path = "<Exception: " + ex.getMessage() + ">";
- }
- return path;
- }
-
-
public void connect(int connectRetries, int connectDelay, boolean verbose) {
checkConnection();
int returnCode = clusterConnection.connect(connectRetries, connectDelay, verbose?1:0);
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java revid:mikael.ronstrom@stripped
@@ -40,7 +40,30 @@ public class ClusterConnectionServiceImp
LoggerFactoryService.getFactory().registerLogger("com.mysql.clusterj.tie");
}
+ /** Load the ndbclient system library only once */
+ static boolean ndbclientLoaded = false;
+
+ static protected void loadSystemLibrary(String name) {
+ // this is not thread-protected so it might be executed multiple times but no matter
+ if (ndbclientLoaded) {
+ return;
+ }
+ try {
+ System.loadLibrary(name);
+ // initialize the charset map
+ Utility.getCharsetMap();
+ ndbclientLoaded = true;
+ } catch (Throwable e) {
+ String path = getLoadLibraryPath();
+ String message = local.message("ERR_Failed_Loading_Library",
+ name, path, e.getClass(), e.getLocalizedMessage());
+ logger.fatal(message);
+ throw new ClusterJFatalUserException(message, e);
+ }
+ }
+
public ClusterConnection create(String connectString, int nodeId) {
+ loadSystemLibrary("ndbclient");
try {
return new ClusterConnectionImpl(connectString, nodeId);
} catch (ClusterJFatalUserException cjex) {
@@ -52,4 +75,17 @@ public class ClusterConnectionServiceImp
}
}
+ /**
+ * @return the load library path or the Exception string
+ */
+ private static String getLoadLibraryPath() {
+ String path;
+ try {
+ path = System.getProperty("java.library.path");
+ } catch (Exception ex) {
+ path = "<Exception: " + ex.getMessage() + ">";
+ }
+ return path;
+ }
+
}
=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbugImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbugImpl.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbugImpl.java revid:mikael.ronstrom@stripped
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import java.nio.ByteBuffer;
+
+import com.mysql.clusterj.Dbug;
+
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+import com.mysql.ndbjtie.mysql.Utils;
+
+/**
+ * This class encapsulates Utils dbug methods to manage dbug settings.
+ */
+public class DbugImpl implements Dbug {
+
+ /** My message translator */
+ static final I18NHelper local = I18NHelper
+ .getInstance(DbugImpl.class);
+
+ /** My logger */
+ static final Logger logger = LoggerFactoryService.getFactory()
+ .getInstance(DbugImpl.class);
+
+ private static final int DBUG_SIZE = 256;
+
+ boolean propertyTrace = false;
+ String fileName = "";
+ Character fileStrategy = 'o';
+ String debugList;
+
+ public DbugImpl() {
+ // Load the native library so we can set up debugging before anything else
+ ClusterConnectionServiceImpl.loadSystemLibrary("ndbclient");
+ }
+
+ public String get() {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(DBUG_SIZE);
+ String result = Utils.dbugExplain(buffer, DBUG_SIZE);
+ return result;
+ }
+
+ public void pop() {
+ Utils.dbugPop();
+ }
+
+ public void push(String state) {
+ Utils.dbugPush(state);
+ }
+
+ public void set(String state) {
+ Utils.dbugSet(state);
+ }
+
+ public void set() {
+ set(toState());
+ }
+
+ public void push() {
+ push(toState());
+ }
+
+ public void print(String keyword, String message) {
+ Utils.dbugPrint(keyword, message);
+ }
+
+ public Dbug trace(boolean trace) {
+ this.propertyTrace = trace;
+ return this;
+ }
+
+ public Dbug trace() {
+ return trace(true);
+ }
+
+ public Dbug output(String fileName) {
+ this.fileName = fileName;
+ this.fileStrategy = 'o';
+ return this;
+ }
+
+ public Dbug append(String fileName) {
+ this.fileName = fileName;
+ this.fileStrategy = 'a';
+ return this;
+ }
+
+ public Dbug flush() {
+ this.fileStrategy = Character.toUpperCase(this.fileStrategy);
+ return this;
+ }
+
+ public Dbug debug(String debugList) {
+ this.debugList = debugList;
+ return this;
+ }
+
+ public Dbug debug(String[] debugList) {
+ StringBuilder builder = new StringBuilder();
+ String sep = "";
+ for (String debug: debugList) {
+ builder.append(sep);
+ builder.append(debug);
+ sep = ",";
+ }
+ this.debugList = builder.toString();
+ return this;
+ }
+
+ private String toState() {
+ String separator = "";
+ StringBuilder builder = new StringBuilder();
+ if (propertyTrace) {
+ builder.append("t");
+ separator = ":";
+ }
+ if (fileName != null) {
+ builder.append(separator);
+ builder.append(fileStrategy);
+ builder.append(',');
+ builder.append(fileName);
+ separator = ":";
+ }
+ if (debugList != null) {
+ builder.append(separator);
+ builder.append("d,");
+ builder.append(debugList);
+ separator = ":";
+ }
+ return builder.toString();
+ }
+
+}
=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/resources/META-INF/services/com.mysql.clusterj.Dbug'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/resources/META-INF/services/com.mysql.clusterj.Dbug 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/resources/META-INF/services/com.mysql.clusterj.Dbug revid:mikael.ronstrom@stripped
@@ -0,0 +1 @@
+com.mysql.clusterj.tie.DbugImpl
=== added file 'storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/DbugTest.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/DbugTest.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/DbugTest.java revid:mikael.ronstrom@stripped
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+package testsuite.clusterj.tie;
+
+public class DbugTest extends testsuite.clusterj.DbugTest {
+
+}
=== modified file 'storage/ndb/include/ndbapi/NdbIndexStat.hpp'
--- a/storage/ndb/include/ndbapi/NdbIndexStat.hpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/include/ndbapi/NdbIndexStat.hpp revid:mikael.ronstrom@stripped
@@ -104,7 +104,16 @@ public:
HaveSysTables = 4244, // create error if all sys tables exist
NoSysEvents = 4710,
BadSysEvents = BadSysTables,
- HaveSysEvents = 746
+ HaveSysEvents = 746,
+ /*
+ * Following are for mysqld. Most are consumed by mysqld itself
+ * and should therefore not be seen by clients.
+ */
+ MyNotAllow = 4721, // stats thread not open for requests
+ MyNotFound = 4722, // stats entry unexpectedly not found
+ MyHasError = 4723, // request ignored due to recent error
+ MyAbortReq = 4724, // request aborted by stats thread
+ AlienUpdate = 4725 // somebody else messed with stats
};
/*
@@ -180,6 +189,7 @@ public:
Uint32 m_totalBytes; // total bytes memory used
Uint64 m_save_time; // microseconds to read stats into cache
Uint64 m_sort_time; // microseconds to sort the cache
+ Uint32 m_ref_count; // in use by query_stat
// end v4 fields
};
=== added file 'storage/ndb/include/util/dbug_utils.hpp'
--- a/storage/ndb/include/util/dbug_utils.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/dbug_utils.hpp revid:mikael.ronstrom@stripped
@@ -0,0 +1,105 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+/*
+ * debug_utils.hpp
+ */
+
+#ifndef dbug_utils_hpp
+#define dbug_utils_hpp
+
+#include "my_global.h"
+#include "my_dbug.h"
+
+/*
+ * These macros are robuster versions of the ones in MySQL's DBUG package.
+ *
+ * As the DBUG macros/functions don't check arguments, the caller (or JVM!)
+ * crashes in case, for instance, of NULL args. Also, macros returning a
+ * value (like DBUG_EXPLAIN) ought to do so even if DBUG_OFF was defined.
+ */
+
+#ifndef DBUG_OFF
+
+#define MY_DBUG_PUSH(a1) \
+ do { if ((a1)) DBUG_PUSH(a1); } while (0)
+#define MY_DBUG_POP() \
+ DBUG_POP()
+#define MY_DBUG_SET(a1) \
+ do { if ((a1)) DBUG_SET(a1); } while (0)
+#define MY_DBUG_EXPLAIN(buf, len) \
+ ((!(buf) || (len) <= 0) ? 1 : DBUG_EXPLAIN(buf, len))
+#define MY_DBUG_PRINT(keyword, arglist) \
+ do { if ((keyword)) DBUG_PRINT(keyword, arglist); } while (0)
+
+#else // DBUG_OFF
+
+#define MY_DBUG_PUSH(a1)
+#define MY_DBUG_POP()
+#define MY_DBUG_SET(a1)
+#define MY_DBUG_EXPLAIN(buf,len) 1
+#define MY_DBUG_PRINT(keyword, arglist)
+
+#endif // DBUG_OFF
+
+/*
+ * These DBUG functions provide suitable mapping targets for use from Java.
+ */
+
+/** Push the state of the DBUG package */
+inline
+void
+dbugPush(const char * state)
+{
+ MY_DBUG_PUSH(state);
+}
+
+/** Pop the state of the DBUG package */
+inline
+void
+dbugPop()
+{
+ MY_DBUG_POP();
+}
+
+/** Set the state of the DBUG package */
+inline
+void
+dbugSet(const char * state)
+{
+ MY_DBUG_SET(state);
+}
+
+/** Return the state of the DBUG package */
+inline
+const char *
+dbugExplain(char * buffer, int length)
+{
+ if (!MY_DBUG_EXPLAIN(buffer, length)) {
+ return buffer;
+ }
+ return NULL;
+}
+
+/** Print a message */
+inline
+void
+dbugPrint(const char * keyword, const char * message)
+{
+ MY_DBUG_PRINT(keyword, (message));
+}
+
+#endif // dbug_utils_hpp
=== modified file 'storage/ndb/include/util/decimal_utils.hpp'
--- a/storage/ndb/include/util/decimal_utils.hpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/include/util/decimal_utils.hpp revid:mikael.ronstrom@stripped
@@ -19,8 +19,8 @@
* decimal_utils.hpp
*/
-#ifndef decimal_utils_h
-#define decimal_utils_h
+#ifndef decimal_utils_hpp
+#define decimal_utils_hpp
/* return values (redeclared here if to be mapped to Java) */
#define E_DEC_OK 0
@@ -36,11 +36,11 @@
/*
decimal_str2bin: Convert string directly to on-disk binary format.
str - string to convert
- len - length of string
+ str_len - length of string
prec - precision of column
scale - scale of column
- dest - buffer for binary representation
- len - length of buffer
+ bin - buffer for binary representation
+ bin_len - length of buffer
NOTES
Added so that NDB API programs can convert directly between the stored
@@ -59,8 +59,8 @@ int decimal_str2bin(const char *str, int
bin_len - length to convert
prec - precision of column
scale - scale of column
- dest - buffer for string representation
- len - length of buffer
+ str - buffer for string representation
+ str_len - length of buffer
NOTES
Added so that NDB API programs can convert directly between the stored
@@ -74,4 +74,4 @@ int decimal_bin2str(const void *bin, int
int prec, int scale,
char *str, int str_len);
-#endif // decimal_utils_h
+#endif // decimal_utils_hpp
=== modified file 'storage/ndb/memcache/CMakeLists.txt'
--- a/storage/ndb/memcache/CMakeLists.txt revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/CMakeLists.txt revid:mikael.ronstrom@stripped
@@ -93,6 +93,8 @@ set(NDB_MEMCACHE_SOURCE_FILES
src/hash_item_util.c
src/ndb_configuration.cc
src/ndb_engine_private.h
+ src/ndb_error_logger.cc
+ src/ndb_flush.cc
src/ndb_pipeline.cc
src/ndb_worker.cc
src/schedulers
@@ -167,6 +169,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DI
# Build the perl include file used by mtr
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/memcached_path.pl.in
${CMAKE_CURRENT_SOURCE_DIR}/memcached_path.pl)
+
+# Copy the SQL script into /scripts/ in the build directory.
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/scripts/ndb_memcache_metadata.sql
+ ${CMAKE_BINARY_DIR}/scripts/ndb_memcache_metadata.sql COPYONLY)
######### TARGETS ############
# Build the convenience library
@@ -201,19 +207,23 @@ set_target_properties(ndb_engine PROPERT
LINK_FLAGS "${FINAL_LINK_FLAGS}")
############ INSTALLER RULES #########
-# Install the ndb_engine.so module
+### Install the ndb_engine.so module
+###
install(TARGETS ndb_engine DESTINATION ${INSTALL_LIBDIR})
-# Install the metadata.sql script
-install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/scripts/ndb_memcache_metadata.sql
- DESTINATION ${INSTALL_MYSQLSHAREDIR})
### Install the memcache-api directory ################
-install(DIRECTORY DESTINATION memcache-api)
-install(PROGRAMS sandbox.sh DESTINATION memcache-api)
-install(FILES README DESTINATION memcache-api)
+###
+SET(MEMCACHE_API_DIR "${INSTALL_MYSQLSHAREDIR}/memcache-api")
+install(DIRECTORY DESTINATION ${MEMCACHE_API_DIR} )
+install(PROGRAMS sandbox.sh DESTINATION ${MEMCACHE_API_DIR})
+install(FILES README DESTINATION ${MEMCACHE_API_DIR})
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/scripts/ndb_memcache_metadata.sql
- DESTINATION memcache-api)
+ DESTINATION ${MEMCACHE_API_DIR})
+# Upgrader scripts:
+install(DIRECTORY DESTINATION ${MEMCACHE_API_DIR}/upgrade)
+install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/scripts/update_to_1.2.sql
+ DESTINATION ${MEMCACHE_API_DIR}/upgrade)
# memcached_path.pl is also installed, for use by installed mtr
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/memcached_path.pl
=== modified file 'storage/ndb/memcache/include/ExternalValue.h'
--- a/storage/ndb/memcache/include/ExternalValue.h revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/include/ExternalValue.h revid:mikael.ronstrom@stripped
@@ -54,6 +54,7 @@ public:
static TableSpec * createContainerRecord(const char *);
static op_status_t do_write(workitem *);
static op_status_t do_delete(workitem *);
+ static int do_delete(memory_pool *, NdbTransaction *, QueryPlan *, Operation &);
static op_status_t do_read_header(workitem *, ndb_async_callback *, worker_step *);
static void append_after_read(NdbTransaction *, workitem *);
static bool setupKey(workitem *, Operation &);
=== modified file 'storage/ndb/memcache/include/Operation.h'
--- a/storage/ndb/memcache/include/Operation.h revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/include/Operation.h revid:mikael.ronstrom@stripped
@@ -107,6 +107,8 @@ public:
// delete
const NdbOperation *deleteTuple(NdbTransaction *tx,
NdbOperation::OperationOptions *options = 0);
+ const NdbOperation *deleteCurrentTuple(NdbScanOperation *, NdbTransaction *,
+ NdbOperation::OperationOptions *opts = 0);
// write
const NdbOperation *writeTuple(NdbTransaction *tx);
@@ -116,10 +118,11 @@ public:
NdbOperation::OperationOptions *options = 0);
// scan
- NdbScanOperation *scanTable(NdbTransaction *tx);
+ NdbScanOperation *scanTable(NdbTransaction *tx,
+ NdbOperation::LockMode lmod = NdbOperation::LM_Read,
+ NdbScanOperation::ScanOptions *options = 0);
NdbIndexScanOperation *scanIndex(NdbTransaction *tx,
NdbIndexScanOperation::IndexBound *bound);
-
};
@@ -268,8 +271,20 @@ inline const NdbOperation *
row_mask, options);
}
-inline NdbScanOperation * Operation::scanTable(NdbTransaction *tx) {
- return tx->scanTable(record->ndb_record);
+inline NdbScanOperation *
+ Operation::scanTable(NdbTransaction *tx, NdbOperation::LockMode lmode,
+ NdbScanOperation::ScanOptions *opts) {
+ return tx->scanTable(record->ndb_record, lmode,
+ read_mask_ptr, opts, 0);
}
+inline const NdbOperation *
+ Operation::deleteCurrentTuple(NdbScanOperation *scanop,
+ NdbTransaction *tx,
+ NdbOperation::OperationOptions *opts) {
+ return scanop->deleteCurrentTuple(tx, record->ndb_record, buffer,
+ read_mask_ptr, opts);
+}
+
+
#endif
=== modified file 'storage/ndb/memcache/include/ndb_engine.h'
--- a/storage/ndb/memcache/include/ndb_engine.h revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/include/ndb_engine.h revid:mikael.ronstrom@stripped
@@ -64,6 +64,7 @@ struct ndb_engine {
struct {
size_t nthreads;
bool cas_enabled;
+ size_t verbose;
} server_options;
union {
@@ -82,5 +83,14 @@ struct ndb_engine {
ndbmc_atomic32_t cas_lo;
};
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern size_t global_max_item_size;
+
+#ifdef __cplusplus
+};
+#endif
#endif
=== added file 'storage/ndb/memcache/include/ndb_error_logger.h'
--- a/storage/ndb/memcache/include/ndb_error_logger.h 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/include/ndb_error_logger.h revid:mikael.ronstrom@stripped
@@ -0,0 +1,53 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+#ifndef NDBMEMCACHE_NDB_ERROR_LOGGER_H
+#define NDBMEMCACHE_NDB_ERROR_LOGGER_H
+
+/* Log NDB error messages,
+ but take care to prevent flooding the log file with repeated errors.
+*/
+
+
+#include "ndbmemcache_global.h"
+#include "workitem.h"
+
+#ifdef __cplusplus
+#include <NdbApi.hpp>
+
+enum {
+ ERR_SUCCESS = ndberror_st_success, /* == 0 */
+ ERR_TEMP = ndberror_st_temporary,
+ ERR_PERM = ndberror_st_permanent,
+ ERR_UR = ndberror_st_unknown
+};
+
+int log_ndb_error(const NdbError &);
+#endif
+
+
+DECLARE_FUNCTIONS_WITH_C_LINKAGE
+
+void ndb_error_logger_init(SERVER_CORE_API *, size_t log_level);
+
+END_FUNCTIONS_WITH_C_LINKAGE
+
+
+#endif
+
=== modified file 'storage/ndb/memcache/include/ndb_pipeline.h'
--- a/storage/ndb/memcache/include/ndb_pipeline.h revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/include/ndb_pipeline.h revid:mikael.ronstrom@stripped
@@ -100,7 +100,7 @@ DECLARE_FUNCTIONS_WITH_C_LINKAGE
ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *);
/** create a generic request pipeline */
-ndb_pipeline * get_request_pipeline();
+ndb_pipeline * get_request_pipeline(int thd_id);
/** call into a pipeline for its own statistics */
void pipeline_add_stats(ndb_pipeline *, const char *key, ADD_STAT, const void *);
=== modified file 'storage/ndb/memcache/scripts/ndb_memcache_metadata.sql'
--- a/storage/ndb/memcache/scripts/ndb_memcache_metadata.sql revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/scripts/ndb_memcache_metadata.sql revid:mikael.ronstrom@stripped
@@ -252,6 +252,7 @@ INSERT INTO memcache_server_roles (role_
INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("db-only", 1);
INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("mc-only", 2);
INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("ndb-caching", 3);
+INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("large", 4);
-- ndb_clusters table
-- Create an entry for the primary cluster.
@@ -303,7 +304,9 @@ INSERT INTO key_prefixes (server_role_id
(3, "", 0, "caching", "demo_table"), /* ndb-caching role */
(3, "t:", 0, "caching", "demo_tabs"),
- (3, "b:", 0, "caching", "demo_ext")
+ (3, "b:", 0, "caching", "demo_ext"),
+
+ (4, "" , 0, "ndb-test", "demo_ext");
;
=== modified file 'storage/ndb/memcache/scripts/update_to_1.2.sql'
--- a/storage/ndb/memcache/scripts/update_to_1.2.sql revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/scripts/update_to_1.2.sql revid:mikael.ronstrom@stripped
@@ -48,6 +48,8 @@ CREATE TABLE IF NOT EXISTS `external_val
INSERT INTO meta VALUES ("ndbmemcache", "1.2");
+INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("large", 4);
+
UPDATE containers
SET expire_time_column = "expire_time",
flags = "flags"
@@ -67,6 +69,8 @@ INSERT INTO key_prefixes (server_role_id
(0, "b:", 0, "ndb-test", "demo_ext"),
(1, "b:", 0, "ndb-only", "demo_ext"),
(3, "t:", 0, "caching", "demo_tabs"),
- (3, "b:", 0, "caching", "demo_ext") ;
+ (3, "b:", 0, "caching", "demo_ext"),
+ (4, "" , 0, "ndb-test", "demo_ext");
+
=== modified file 'storage/ndb/memcache/src/ClusterConnectionPool.cc'
--- a/storage/ndb/memcache/src/ClusterConnectionPool.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ClusterConnectionPool.cc revid:mikael.ronstrom@stripped
@@ -59,7 +59,7 @@ void store_connection_pool_for_cluster(c
ClusterConnectionPool *p) {
DEBUG_ENTER();
if(name == 0) name = "[default]";
- int name_len = strlen(name);
+ // int name_len = strlen(name);
if(pthread_mutex_lock(& conn_pool_map_lock) == 0) {
if(conn_pool_map == 0) {
=== modified file 'storage/ndb/memcache/src/Config_v1.cc'
--- a/storage/ndb/memcache/src/Config_v1.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/Config_v1.cc revid:mikael.ronstrom@stripped
@@ -463,7 +463,6 @@ bool config_v1::store_prefix(const char
TableSpec *table,
int cluster_id,
char *cache_policy) {
- DEBUG_PRINT("%s", name);
KeyPrefix prefix(name);
prefix_info_t * info_ptr;
=== modified file 'storage/ndb/memcache/src/ExpireTime.cc'
--- a/storage/ndb/memcache/src/ExpireTime.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ExpireTime.cc revid:mikael.ronstrom@stripped
@@ -29,8 +29,8 @@
ExpireTime::ExpireTime(workitem *i) :
item(i),
- is_expired(false),
- ndb_expire_time(0)
+ ndb_expire_time(0),
+ is_expired(false)
{
SERVER_CORE_API * SERVER = item->pipeline->engine->server.core;
current_time = SERVER->get_current_time();
=== modified file 'storage/ndb/memcache/src/ExternalValue.cc'
--- a/storage/ndb/memcache/src/ExternalValue.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ExternalValue.cc revid:mikael.ronstrom@stripped
@@ -70,6 +70,42 @@ TableSpec * ExternalValue::createContain
}
+/* This is called from FLUSH_ALL.
+ It returns the number of parts deleted.
+ It uses a memory pool, passed in, to allocate key buffers.
+*/
+int ExternalValue::do_delete(memory_pool *mpool, NdbTransaction *delTx,
+ QueryPlan *plan, Operation & op) {
+ Uint32 id, nparts = 0;
+ QueryPlan * extern_plan = plan->extern_store;
+
+ if(extern_plan
+ && ! (op.isNull(COL_STORE_EXT_SIZE) || op.isNull(COL_STORE_EXT_ID))) {
+
+ /* How many parts? */
+ Uint32 stripe_size = extern_plan->val_record->value_length;
+ Uint32 len = op.getIntValue(COL_STORE_EXT_SIZE);
+ id = op.getIntValue(COL_STORE_EXT_ID);
+ nparts = len / stripe_size;
+ if(len % stripe_size) nparts += 1;
+
+ /* Delete them */
+ int key_size = extern_plan->key_record->rec_size;
+
+ for(Uint32 i = 0; i < nparts ; i++) {
+ Operation part_op(extern_plan);
+ part_op.key_buffer = (char *) memory_pool_alloc(mpool, key_size);
+
+ part_op.clearKeyNullBits();
+ part_op.setKeyPartInt(COL_STORE_KEY + 0, id);
+ part_op.setKeyPartInt(COL_STORE_KEY + 1, i);
+ part_op.deleteTuple(delTx);
+ }
+ }
+ return nparts;
+}
+
+
inline bool ExternalValue::setupKey(workitem *item, Operation &op) {
op.key_buffer = item->ndb_key_buffer;
op.clearKeyNullBits();
@@ -209,15 +245,15 @@ void ExternalValue::append_after_read(Nd
/* Constructor */
ExternalValue::ExternalValue(workitem *item, NdbTransaction *t) :
+ old_hdr(item->plan->extern_store->val_record->value_length), // (part size)
+ new_hdr(item->plan->extern_store->val_record->value_length),
+ expire_time(item),
tx(t),
wqitem(item),
- expire_time(item),
ext_plan(item->plan->extern_store),
- old_hdr(item->plan->extern_store->val_record->value_length), // (part size)
- new_hdr(item->plan->extern_store->val_record->value_length),
value(0),
- stored_cas(0),
- value_size_in_header(item->plan->row_record->value_length)
+ value_size_in_header(item->plan->row_record->value_length),
+ stored_cas(0)
{
do_server_cas = (item->prefix_info.has_cas_col && item->cas);
wqitem->ext_val = this;
=== modified file 'storage/ndb/memcache/src/QueryPlan.cc'
--- a/storage/ndb/memcache/src/QueryPlan.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/QueryPlan.cc revid:mikael.ronstrom@stripped
@@ -31,9 +31,7 @@
#include "ExternalValue.h"
extern EXTENSION_LOGGER_DESCRIPTOR *logger;
-extern "C" {
- size_t global_max_item_size;
-}
+size_t global_max_item_size;
/* For each pair [TableSpec,NDB Object], we can cache some dictionary
lookups, acccess path information, NdbRecords, etc.
@@ -78,11 +76,11 @@ inline const NdbDictionary::Column *get_
QueryPlan::QueryPlan(Ndb *my_ndb, const TableSpec *my_spec, PlanOpts opts) :
initialized(0),
dup_numbers(false),
- db(my_ndb),
- spec(my_spec),
is_scan(false),
- static_flags(spec->static_flags)
-{
+ spec(my_spec),
+ static_flags(spec->static_flags),
+ db(my_ndb)
+{
const NdbDictionary::Column *col;
bool op_ok = false;
bool last_value_col_is_int = false;
@@ -139,7 +137,7 @@ QueryPlan::QueryPlan(Ndb *my_ndb, const
/* Key Columns */
for(int i = 0; i < spec->nkeycols ; i++) {
col = get_ndb_col(spec, table, spec->key_columns[i]);
- int this_col_id = col->getColumnNo();
+ // int this_col_id = col->getColumnNo();
key_record->addColumn(COL_STORE_KEY, col);
row_record->addColumn(COL_STORE_KEY, col);
}
=== modified file 'storage/ndb/memcache/src/Record.cc'
--- a/storage/ndb/memcache/src/Record.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/Record.cc revid:mikael.ronstrom@stripped
@@ -31,9 +31,11 @@
extern EXTENSION_LOGGER_DESCRIPTOR *logger;
Record::Record(int ncol) : ncolumns(ncol), rec_size(0), nkeys(0), nvalues(0),
- index(0), n_nullable(0),
- start_of_nullmap(0), size_of_nullmap(0),
value_length(0),
+ index(0),
+ n_nullable(0),
+ start_of_nullmap(0),
+ size_of_nullmap(0),
handlers(new DataTypeHandler *[ncol]),
specs(new NdbDictionary::RecordSpecification[ncol])
{
=== modified file 'storage/ndb/memcache/src/ndb_configuration.cc'
--- a/storage/ndb/memcache/src/ndb_configuration.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ndb_configuration.cc revid:mikael.ronstrom@stripped
@@ -31,6 +31,7 @@
#include "NdbInstance.h"
#include "thread_identifier.h"
#include "Scheduler.h"
+#include "ExternalValue.h"
/* A static global variable */
extern EXTENSION_LOGGER_DESCRIPTOR *logger;
@@ -158,9 +159,10 @@ void disconnect_all() {
/* This function has C linkage */
void print_debug_startup_info() {
int sz[4];
- DEBUG_PRINT(" sizeof Ndb : %lu", sz[0]=sizeof(Ndb));
- DEBUG_PRINT(" sizeof NdbInstance: %lu", sz[1]=sizeof(NdbInstance));
- DEBUG_PRINT(" sizeof workitem: %lu", sizeof(workitem));
+ DEBUG_PRINT(" sizeof Ndb : %lu", sz[0]=sizeof(Ndb));
+ DEBUG_PRINT(" sizeof NdbInstance : %lu", sz[1]=sizeof(NdbInstance));
+ DEBUG_PRINT(" sizeof workitem : %lu", sizeof(workitem));
+ DEBUG_PRINT(" sizeof ExternalValue : %lu", sizeof(ExternalValue));
}
=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ndb_engine.c revid:mikael.ronstrom@stripped
@@ -38,10 +38,10 @@
#include "Scheduler.h"
#include "thread_identifier.h"
#include "timing.h"
+#include "ndb_error_logger.h"
/* Global variables */
EXTENSION_LOGGER_DESCRIPTOR *logger;
-size_t global_max_item_size;
/* Static and local to this file */
const char * set_ops[] = { "","add","set","replace","append","prepend","cas" };
@@ -206,6 +206,9 @@ static ENGINE_ERROR_CODE ndb_initialize(
fetch_core_settings(ndb_eng, def_eng);
nthreads = ndb_eng->server_options.nthreads;
+ /* Initialize the error handler */
+ ndb_error_logger_init(def_eng->server.core, ndb_eng->server_options.verbose);
+
logger->log(LOG_WARNING, NULL, "Server started with %d threads.\n", nthreads);
logger->log(LOG_WARNING, NULL, "Priming the pump ... ");
timing_point(& pump_time);
@@ -225,7 +228,7 @@ static ENGINE_ERROR_CODE ndb_initialize(
ndb_eng->pipelines = malloc(nthreads * sizeof(void *));
ndb_eng->schedulers = malloc(nthreads * sizeof(void *));
for(i = 0 ; i < nthreads ; i++) {
- ndb_eng->pipelines[i] = get_request_pipeline();
+ ndb_eng->pipelines[i] = get_request_pipeline(i);
ndb_eng->schedulers[i] =
initialize_scheduler(ndb_eng->startup_options.scheduler, nthreads, i);
if(ndb_eng->schedulers[i] == 0) {
@@ -790,6 +793,9 @@ int fetch_core_settings(struct ndb_engin
{ .key = "num_threads",
.datatype = DT_SIZE,
.value.dt_size = &engine->server_options.nthreads },
+ { .key = "verbosity",
+ .datatype = DT_SIZE,
+ .value.dt_size = &engine->server_options.verbose },
{ .key = NULL }
};
=== added file 'storage/ndb/memcache/src/ndb_error_logger.cc'
--- a/storage/ndb/memcache/src/ndb_error_logger.cc 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/ndb_error_logger.cc revid:mikael.ronstrom@stripped
@@ -0,0 +1,188 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+
+/* System headers */
+#include <pthread.h>
+#include <time.h>
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+
+/* Memcache headers */
+#include "memcached/types.h"
+#include "memcached/extension_loggers.h"
+#include "memcached/server_api.h"
+
+#include "ndb_error_logger.h"
+
+
+/* ***********************************************************************
+ ndb_error_logger
+
+ Log NDB error messages, but try to avoid flooding the logfile with them.
+ ***********************************************************************
+*/
+
+
+/* Memcached externals */
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+SERVER_CORE_API * core_api;
+size_t verbose_logging;
+
+/* Internal Static Globals and Declarations */
+#define ERROR_HASH_TABLE_SIZE 251
+pthread_mutex_t error_table_lock;
+class ErrorEntry;
+ErrorEntry * error_hash_table[ERROR_HASH_TABLE_SIZE];
+
+/* Prototypes */
+void manage_error(const NdbError &, const char * mesg, rel_time_t interval);
+
+
+
+/********* PUBLIC API *************************************/
+/* Initialize the NDB Error Logger */
+void ndb_error_logger_init(SERVER_CORE_API * api, size_t level) {
+ int r = pthread_mutex_init(& error_table_lock, NULL);
+ if(r) logger->log(LOG_WARNING,0, "CANNOT INIT ERROR MUTEX: %d\n", r);
+ core_api = api;
+ verbose_logging = level;
+
+ for(int i = 0; i < ERROR_HASH_TABLE_SIZE; i++)
+ error_hash_table[i] = 0;
+}
+
+
+int log_ndb_error(const NdbError &error) {
+ switch(error.status) {
+ case ndberror_st_success:
+ break;
+
+ case ndberror_st_temporary:
+ manage_error(error, "NDB Temporary Error", 10);
+ break;
+
+ case ndberror_st_permanent:
+ case ndberror_st_unknown:
+ manage_error(error, "NDB Error", 10);
+ break;
+ }
+ /* NDB classifies "Out Of Memory" (827) errors as permament errors, but we
+ reclassify them to temporary */
+ if(error.classification == NdbError::InsufficientSpace)
+ return ERR_TEMP;
+ return error.status;
+}
+
+
+/********* IMPLEMENTATION *******************************/
+
+class ErrorEntry {
+public:
+ int error_code;
+ rel_time_t first;
+ rel_time_t time[2]; /* odd and even timestamps */
+ Uint32 count;
+ ErrorEntry *next;
+
+ ErrorEntry(int code, rel_time_t tm) :
+ error_code(code), first(tm), count(1), next(0)
+ {
+ time[0] = 0;
+ time[1] = tm;
+ };
+};
+
+
+class Lock {
+public:
+ pthread_mutex_t *mutex;
+ int status;
+ Lock(pthread_mutex_t *m) : mutex(m) { status = pthread_mutex_lock(mutex); }
+ ~Lock() { pthread_mutex_unlock(mutex); }
+};
+
+ErrorEntry * error_table_lookup(int code, rel_time_t now);
+
+
+/* Lock the error table and look up an error.
+ If found, increment the count and set either the odd or even timestamp.
+ If not found, create.
+*/
+ErrorEntry * error_table_lookup(int code, rel_time_t now) {
+ int hash_val = code % ERROR_HASH_TABLE_SIZE;
+ Lock lock(& error_table_lock);
+ ErrorEntry *sym;
+
+ for(sym = error_hash_table[hash_val] ; sym != 0 ; sym = sym->next) {
+ if(sym->error_code == code) {
+ sym->time[(++(sym->count)) % 2] = now;
+ return sym;
+ }
+ }
+
+ /* Create */
+ sym = new ErrorEntry(code, now);
+ sym->next = error_hash_table[hash_val];
+ error_hash_table[hash_val] = sym;
+ return sym;
+}
+
+
+/* Record the error message, and possibly log it. */
+void manage_error(const NdbError & error, const char *type_mesg, rel_time_t interval) {
+ char note[256];
+ ErrorEntry *entry = 0;
+ bool first_ever, interval_passed, flood = false;
+ int current = 0, prior = 0; // array indexes
+
+ if(verbose_logging == 0) {
+ entry = error_table_lookup(error.code, core_api->get_current_time());
+
+ if((entry->count | 1) == entry->count)
+ current = 1; // odd count
+ else
+ prior = 1; // even count
+
+ /* We have four pieces of information: the first timestamp, the two
+ most recent timestamps, and the error count. When to write a log message?
+ (A) On the first occurrence of an error.
+ (B) If a time > interval has passed since the previous message.
+ (C) At certain count numbers in error flood situations
+ */
+ first_ever = (entry->count == 1);
+ interval_passed = (entry->time[current] - entry->time[prior] > interval);
+ if(! interval_passed)
+ for(Uint32 i = 10 ; i <= entry->count ; i *= 10)
+ if(entry->count < (10 * i) && (entry->count % i == 0))
+ { flood = true; break; }
+ }
+
+ if(verbose_logging || first_ever || interval_passed || flood)
+ {
+ if(flood)
+ snprintf(note, 256, "[occurrence %d of this error]", entry->count);
+ else
+ note[0] = '\0';
+ logger->log(LOG_WARNING, 0, "%s %d: %s %s\n",
+ type_mesg, error.code, error.message, note);
+ }
+}
+
=== added file 'storage/ndb/memcache/src/ndb_flush.cc'
--- a/storage/ndb/memcache/src/ndb_flush.cc 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/ndb_flush.cc revid:mikael.ronstrom@stripped
@@ -0,0 +1,290 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+
+/* System headers */
+#define __STDC_FORMAT_MACROS
+#include <assert.h>
+#include <ctype.h>
+
+/* Memcache headers */
+#include "memcached/types.h"
+#include <memcached/extension_loggers.h>
+
+/* NDB headers */
+#include "NdbApi.hpp"
+
+/* NDB Memcache headers */
+#include "ndbmemcache_global.h"
+#include "Configuration.h"
+#include "ExternalValue.h"
+#include "debug.h"
+#include "Operation.h"
+#include "NdbInstance.h"
+#include "ndb_pipeline.h"
+#include "ndb_error_logger.h"
+#include "ndb_worker.h"
+
+/* Extern pointers */
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+
+/* Scan helpers */
+
+// nextResult() return values:
+enum {
+ fetchError = -1,
+ fetchOK = 0,
+ fetchScanFinished = 1,
+ fetchCacheEmpty = 2
+};
+
+enum { fetchFromThisBatch = false, fetchNewBatchFromKernel = true };
+enum { SendImmediate = true, sendDeferred = false };
+
+bool scan_delete(NdbInstance *, QueryPlan *);
+bool scan_delete_ext_val(ndb_pipeline *, NdbInstance *, QueryPlan *);
+
+
+/*************** SYNCHRONOUS IMPLEMENTATION OF "FLUSH ALL" ******************/
+
+/* Flush all is a fully synchronous operation --
+ the memcache server is waiting for a response, and the thread is blocked.
+ */
+ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *pipeline) {
+ DEBUG_ENTER();
+ bool r;
+ const Configuration &conf = get_Configuration();
+
+ DEBUG_PRINT(" %d prefixes", conf.nprefixes);
+ for(unsigned int i = 0 ; i < conf.nprefixes ; i++) {
+ const KeyPrefix *pfx = conf.getPrefix(i);
+ if(pfx->info.use_ndb && pfx->info.do_db_flush) {
+ ClusterConnectionPool *pool = conf.getConnectionPoolById(pfx->info.cluster_id);
+ Ndb_cluster_connection *conn = pool->getMainConnection();
+ NdbInstance inst(conn, 128);
+ QueryPlan plan(inst.db, pfx->table);
+ if(plan.pk_access) {
+ // To flush, scan the table and delete every row
+ if(plan.canHaveExternalValue()) {
+ DEBUG_PRINT("prefix %d - doing ExternalValue delete");
+ r = scan_delete_ext_val(pipeline, &inst, &plan);
+ }
+ else {
+ DEBUG_PRINT("prefix %d - deleting from %s", i, pfx->table->table_name);
+ r = scan_delete(&inst, &plan);
+ }
+ if(! r) logger->log(LOG_WARNING, 0, "-- FLUSH_ALL Failed.\n");
+ }
+ else DEBUG_PRINT("prefix %d - not scanning table %s -- accees path "
+ "is not primary key", i, pfx->table->table_name);
+ }
+ else DEBUG_PRINT("prefix %d - not scanning table %s -- use_ndb:%d flush:%d",
+ i, pfx->table ? pfx->table->table_name : "",
+ pfx->info.use_ndb, pfx->info.do_db_flush);
+ }
+
+ return ENGINE_SUCCESS;
+}
+
+
+bool scan_delete(NdbInstance *inst, QueryPlan *plan) {
+ DEBUG_ENTER();
+ bool rescan, fetch_option;
+ int rFetch, rExec, rDel, batch_size, rows_deleted;
+ int error_status = 0;
+ const unsigned int max_errors = 100000;
+
+ struct {
+ unsigned int errors;
+ unsigned int rows;
+ unsigned short scans;
+ unsigned short commit_batches;
+ } stats = {0, 0, 0, 0 };
+
+ /* The outer loop performs an initial table scan and then possibly some
+ rescans, which are triggered whenever some rows have been scanned but,
+ due to an error condition, have not been deleted.
+ */
+ do {
+ batch_size = 1; /* slow start. */
+ stats.scans += 1;
+ rescan = false;
+
+ NdbTransaction *scanTx = inst->db->startTransaction();
+ NdbScanOperation *scan = scanTx->getNdbScanOperation(plan->table);
+
+ /* Express intent to read with exclusive lock; execute NoCommit */
+ scan->readTuplesExclusive();
+ rExec = scanTx->execute(NdbTransaction::NoCommit);
+ if(rExec != 0) {
+ stats.errors++;
+ error_status = log_ndb_error(scanTx->getNdbError());
+ break;
+ }
+
+ /* Within a scan, this loop iterates over batches.
+ Batches are committed whenever the batch_size has been reached.
+ Batch size starts at 1 and doubles when a batch is succesful,
+ until it reaches the result cache size.
+ */
+ while(1) {
+ stats.commit_batches++;
+ NdbTransaction *delTx = inst->db->startTransaction();
+ rows_deleted = 0;
+ fetch_option = fetchNewBatchFromKernel;
+ bool fetch_more;
+
+ /* The inner loop iterates over rows within a batch */
+ do {
+ fetch_more = false;
+ rFetch = scan->nextResult(fetch_option, SendImmediate);
+ switch(rFetch) {
+ case fetchError:
+ stats.errors++;
+ error_status = log_ndb_error(scan->getNdbError());
+ break;
+
+ case fetchOK:
+ rDel = scan->deleteCurrentTuple(delTx);
+ if(rDel == 0) {
+ fetch_more = ( ++rows_deleted < batch_size);
+ fetch_option = fetchFromThisBatch;
+ }
+ else {
+ stats.errors++;
+ error_status = log_ndb_error(delTx->getNdbError());
+ }
+ break;
+
+ case fetchScanFinished:
+ case fetchCacheEmpty:
+ default:
+ break;
+ }
+ } while(fetch_more); /* break out of the inner loop to here */
+
+ /* Quit now if errors were serious */
+ if(error_status > ERR_TEMP)
+ break;
+
+ /* Execute the batch */
+ rExec = delTx->execute(NdbTransaction::Commit, NdbOperation::AbortOnError, 1);
+ if(rExec == 0) {
+ stats.rows += rows_deleted;
+ if(rFetch != fetchCacheEmpty)
+ batch_size *= 2;
+ }
+ else {
+ stats.errors++;
+ error_status = log_ndb_error(delTx->getNdbError());
+ if(batch_size > 1)
+ batch_size /= 2;
+ rescan = true;
+ }
+
+ delTx->close();
+
+ if(rFetch == fetchScanFinished || (stats.errors > max_errors))
+ break;
+ } /* break out of the batch loop to here */
+
+ scanTx->close();
+ } while(rescan && (error_status < ERR_PERM) && stats.errors < max_errors);
+
+ logger->log(LOG_WARNING, 0, "Flushed rows from %s.%s: "
+ "Scans: %d Batches: %d Rows: %d Errors: %d",
+ plan->spec->schema_name, plan->spec->table_name,
+ stats.scans, stats.commit_batches, stats.rows, stats.errors);
+
+ return (stats.rows || ! stats.errors);
+}
+
+
+/* External Values require a different version of FLUSH_ALL, which preserves
+ the referential integrity between the main table and the parts table
+ while deleting. This one uses the NdbRecord variant of a scan and commits
+ once for each row of the main table.
+*/
+bool scan_delete_ext_val(ndb_pipeline *pipeline, NdbInstance *inst,
+ QueryPlan *plan) {
+ DEBUG_ENTER();
+ int r, ext_rows, error_status = 0;
+ bool fetch_more;
+ struct {
+ Uint32 main_rows;
+ Uint32 ext_rows;
+ Uint32 errors;
+ } stats = {0, 0, 0 };
+
+ /* Need KeyInfo when performing scanning delete */
+ NdbScanOperation::ScanOptions opts;
+ opts.optionsPresent=NdbScanOperation::ScanOptions::SO_SCANFLAGS;
+ opts.scan_flags=NdbScanOperation::SF_KeyInfo;
+
+ memory_pool * pool = pipeline_create_memory_pool(pipeline);
+ NdbTransaction *scanTx = inst->db->startTransaction();
+ Operation op(plan, OP_SCAN);
+ op.readSelectedColumns();
+ op.readColumn(COL_STORE_EXT_SIZE);
+ op.readColumn(COL_STORE_EXT_ID);
+
+ NdbScanOperation *scan = op.scanTable(scanTx, NdbOperation::LM_Exclusive, &opts);
+ r = scanTx->execute(NdbTransaction::NoCommit);
+
+ if(r == 0) { /* Here's the scan loop */
+ do {
+ fetch_more = false;
+ r = scan->nextResult((const char **) & op.buffer, true, true);
+ if(r == fetchOK) {
+ fetch_more = true;
+ NdbTransaction * delTx = inst->db->startTransaction();
+
+ op.deleteCurrentTuple(scan, delTx); // main row
+ ext_rows = ExternalValue::do_delete(pool, delTx, plan, op); // parts
+
+ r = delTx->execute(NdbTransaction::Commit,
+ NdbOperation::AbortOnError,
+ SendImmediate);
+ if(r)
+ error_status = log_ndb_error(delTx->getNdbError()), stats.errors++;
+ else
+ stats.main_rows++, stats.ext_rows += ext_rows;
+
+ memory_pool_free(pool);
+ delTx->close();
+ }
+ else {
+ break;
+ }
+ } while(fetch_more && (error_status < ERR_PERM));
+ }
+
+ memory_pool_destroy(pool);
+ scanTx->close();
+
+ logger->log(LOG_WARNING, 0, "Flushed %d rows from %s plus "
+ "%d rows from %s. Errors: %d\n",
+ stats.main_rows, plan->spec->table_name,
+ stats.ext_rows, plan->extern_store->spec->table_name,
+ stats.errors);
+
+ return (stats.main_rows || ! stats.errors);
+}
+
+
=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc revid:mikael.ronstrom@stripped
@@ -73,7 +73,7 @@ void init_pool_header(allocation_referen
/* initialize a new pipeline for an NDB engine thread */
ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *engine) {
bool did_inc;
- int id;
+ unsigned int id;
thread_identifier * tid;
/* Get my pipeline id */
@@ -85,15 +85,14 @@ ndb_pipeline * ndb_pipeline_initialize(s
/* Fetch the partially initialized pipeline */
ndb_pipeline * self = (ndb_pipeline *) engine->pipelines[id];
- /* Set my id */
- self->id = id;
-
+ assert(self->id == id);
+
/* Set the pointer back to the engine */
self->engine = engine;
/* And the thread id */
self->engine_thread_id = pthread_self();
-
+
/* Create and set a thread identity */
tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier));
tid->pipeline = self;
@@ -109,13 +108,13 @@ ndb_pipeline * ndb_pipeline_initialize(s
/* Allocate and initialize a generic request pipeline */
-ndb_pipeline * get_request_pipeline() {
+ndb_pipeline * get_request_pipeline(int thd_id) {
/* Allocate the pipeline */
ndb_pipeline *self = (ndb_pipeline *) malloc(sizeof(ndb_pipeline));
- /* Initialize */
+ /* Initialize */
self->engine = 0;
- self->id = 0;
+ self->id = thd_id;
self->nworkitems = 0;
/* Say hi to the alligator */
=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ndb_worker.cc revid:mikael.ronstrom@stripped
@@ -54,6 +54,7 @@
#include "ndb_engine.h"
#include "hash_item_util.h"
#include "ndb_worker.h"
+#include "ndb_error_logger.h"
/**********************************************************
Schedduler::schedule()
@@ -122,7 +123,6 @@ worker_step worker_finalize_write;
/* Misc utility functions */
void worker_set_cas(ndb_pipeline *, uint64_t *);
int build_cas_routine(NdbInterpretedCode *r, int cas_col, uint64_t cas_val);
-bool scan_delete(NdbInstance *, QueryPlan *);
void build_hash_item(workitem *, Operation &, ExpireTime &);
/* Extern pointers */
@@ -166,6 +166,9 @@ status_block status_block_idx_insert =
status_block status_block_too_big =
{ ENGINE_E2BIG, "Value too large" };
+status_block status_block_no_mem =
+ { ENGINE_ENOMEM, "NDB out of data memory" };
+
void worker_set_cas(ndb_pipeline *p, uint64_t *cas) {
/* Be careful here -- ndbmc_atomic32_t might be a signed type.
Shitfting of signed types behaves differently. */
@@ -676,7 +679,7 @@ op_status_t WorkerStep1::do_math() {
void callback_main(int, NdbTransaction *tx, void *itemptr) {
workitem *wqitem = (workitem *) itemptr;
- ndb_pipeline * & pipeline = wqitem->pipeline;
+ // ndb_pipeline * & pipeline = wqitem->pipeline;
/************** Error handling ***********/
/* No Error */
@@ -723,10 +726,21 @@ void callback_main(int, NdbTransaction *
else if(tx->getNdbError().code == 897) {
wqitem->status = & status_block_idx_insert;
}
+ /* Out of memory */
+ else if(tx->getNdbError().code == 827) {
+ log_ndb_error(tx->getNdbError());
+ wqitem->status = & status_block_no_mem;
+ }
+ /* 284: Table not defined in TC (stale definition) */
+ else if(tx->getNdbError().code == 284) {
+ /* TODO: find a way to handle this error, after an ALTER TABLE */
+ log_ndb_error(tx->getNdbError());
+ wqitem->status = & status_block_misc_error;
+ }
+
/* Some other error */
else {
- DEBUG_PRINT("[%d]: %s",
- tx->getNdbError().code, tx->getNdbError().message);
+ log_ndb_error(tx->getNdbError());
wqitem->status = & status_block_misc_error;
}
@@ -736,7 +750,7 @@ void callback_main(int, NdbTransaction *
void callback_incr(int result, NdbTransaction *tx, void *itemptr) {
workitem *wqitem = (workitem *) itemptr;
- ndb_pipeline * & pipeline = wqitem->pipeline;
+ // ndb_pipeline * & pipeline = wqitem->pipeline;
/* read insert update cr_flag response
------------------------------------------------------------------------
@@ -1105,151 +1119,3 @@ int build_cas_routine(NdbInterpretedCode
return r->finalise(); // resolve the label/branch
}
-
-/*************** SYNCHRONOUS IMPLEMENTATION OF "FLUSH ALL" ******************/
-
-/* Flush all is a fully synchronous operation --
- the memcache server is waiting for a response, and the thread is blocked.
-*/
-ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *pipeline) {
- DEBUG_ENTER();
- const Configuration &conf = get_Configuration();
-
- DEBUG_PRINT(" %d prefixes", conf.nprefixes);
- for(unsigned int i = 0 ; i < conf.nprefixes ; i++) {
- const KeyPrefix *pfx = conf.getPrefix(i);
- if(pfx->info.use_ndb && pfx->info.do_db_flush) {
- ClusterConnectionPool *pool = conf.getConnectionPoolById(pfx->info.cluster_id);
- Ndb_cluster_connection *conn = pool->getMainConnection();
- NdbInstance inst(conn, 128);
- QueryPlan plan(inst.db, pfx->table);
- if(plan.pk_access) {
- // To flush, scan the table and delete every row
- DEBUG_PRINT("prefix %d - deleting from %s", i, pfx->table->table_name);
- scan_delete(&inst, &plan);
- // If there is an external store, also delete from the large value table
- if(plan.canHaveExternalValue()) {
- DEBUG_PRINT("prefix %d - deleting from %s", i,
- plan.extern_store->spec->table_name);
- scan_delete(&inst, plan.extern_store);
- }
- }
- else DEBUG_PRINT("prefix %d - not scanning table %s -- accees path "
- "is not primary key", i, pfx->table->table_name);
- }
- else DEBUG_PRINT("prefix %d - not scanning table %s -- use_ndb:%d flush:%d",
- i, pfx->table ? pfx->table->table_name : "",
- pfx->info.use_ndb, pfx->info.do_db_flush);
- }
-
- return ENGINE_SUCCESS;
-}
-
-
-bool scan_delete(NdbInstance *inst, QueryPlan *plan) {
- DEBUG_ENTER();
- int check;
- bool rescan;
- int res = 0;
- const int max_batch_size = 1000;
- int batch_size = 1;
- int delTxRowCount = 0;
- int force_send = 1;
- struct {
- unsigned short scans;
- unsigned short errors;
- unsigned short rows;
- unsigned short commit_batches;
- } stats = {0, 0, 0, 0 };
-
- /* To securely scan a whole table, use an outer transaction only for the scan,
- but take over each lock in an inner transaction (with a row count) that
- deletes 1000 rows per transaction
- */
- do {
- stats.scans += 1;
- rescan = false;
- NdbTransaction *scanTx = inst->db->startTransaction();
- NdbTransaction *delTx = inst->db->startTransaction();
- NdbScanOperation *scan = scanTx->getNdbScanOperation(plan->table);
- scan->readTuplesExclusive();
-
- /* execute NoCommit */
- if((res = scanTx->execute(NdbTransaction::NoCommit)) != 0)
- logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n",
- scanTx->getNdbError().message);
-
- /* scan and delete. delTx takes over the lock. */
- while(scan->nextResult(true) == 0) {
- do {
- if((res = scan->deleteCurrentTuple(delTx)) == 0) {
- delTxRowCount += 1;
- }
- else {
- logger->log(LOG_WARNING, 0, "deleteCurrentTuple(): %s\n",
- scanTx->getNdbError().message);
- }
- } while((check = scan->nextResult(false)) == 0);
-
- /* execute a batch (NoCommit) */
- if(check != -1) {
- res = delTx->execute(NdbTransaction::NoCommit,
- NdbOperation::AbortOnError, force_send);
- if(res != 0) {
- stats.errors += 1;
- if(delTx->getNdbError().code == 827) {
- /* DataMemory is full, and the kernel could not create a Copy Tuple
- for a deleted row. Rollback this batch, turn off force-send
- (for throttling), make batches smalller, and trigger a
- rescan to clean up these rows. */
- rescan = true;
- delTx->execute(NdbTransaction::Rollback);
- delTx->close();
- delTx = inst->db->startTransaction();
- delTxRowCount = 0;
- if(batch_size > 1) batch_size /= 2;
- force_send = 0;
- }
- else {
- logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n",
- delTx->getNdbError().message);
- }
- }
- }
-
- /* Execute & commit a batch */
- if(delTxRowCount >= batch_size) {
- stats.commit_batches += 1;
- res = delTx->execute(NdbTransaction::Commit,
- NdbOperation::AbortOnError, force_send);
- if(res != 0) {
- stats.errors++;
- logger->log(LOG_WARNING, 0, "execute(Commit): %s\n",
- delTx->getNdbError().message);
- }
- stats.rows += delTxRowCount;
- delTxRowCount = 0;
- delTx->close();
- delTx = inst->db->startTransaction();
- batch_size *= 2;
- if(batch_size > max_batch_size) {
- batch_size = max_batch_size;
- force_send = 1;
- }
- }
- }
- /* Final execute & commit */
- res = delTx->execute(NdbTransaction::Commit);
- delTx->close();
- scanTx->close();
-
- } while(rescan);
-
- logger->log(EXTENSION_LOG_INFO, 0, "Flushed all rows from %s.%s: "
- "Scans: %d Batches: %d Rows: %d Errors: %d",
- plan->spec->schema_name, plan->spec->table_name,
- stats.scans, stats.commit_batches, stats.rows, stats.errors);
-
- return (res == 0);
-}
-
=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc revid:mikael.ronstrom@stripped
@@ -125,7 +125,7 @@ void S::SchedulerGlobal::reconfigure(Con
void S::SchedulerGlobal::shutdown() {
if(running) {
- logger->log(LOG_WARNING, 0, "Shutting down scheduler.");
+ logger->log(LOG_INFO, 0, "Shutting down scheduler.");
/* First shut down each WorkerConnection */
for(int i = 0; i < nclusters ; i++) {
@@ -249,8 +249,6 @@ void S::SchedulerGlobal::add_stats(const
void S::SchedulerWorker::init(int my_thread,
int nthreads,
const char * config_string) {
-
- DEBUG_ENTER_METHOD("S::SchedulerWorker::init");
/* On the first call in, initialize the SchedulerGlobal.
* This will start the send & poll threads for each connection.
*/
@@ -298,6 +296,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
pthread_rwlock_unlock(& reconf_lock);
}
else {
+ logger->log(LOG_INFO, 0, "S Scheduler could not acquire read lock");
return ENGINE_TMPFAIL;
}
/* READ LOCK RELEASED */
@@ -316,6 +315,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
all we can do is return an error.
(Or, alternately, the scheduler may be shutting down.)
*/
+ logger->log(LOG_INFO, 0, "No free NDB instances.");
return ENGINE_TMPFAIL;
}
@@ -426,9 +426,9 @@ void S::SchedulerWorker::add_stats(const
/* Cluster methods */
S::Cluster::Cluster(SchedulerGlobal *global, int _id) :
- cluster_id(_id),
- nreferences(0),
- threads_started(false)
+ threads_started(false),
+ cluster_id(_id),
+ nreferences(0)
{
DEBUG_PRINT("%d", cluster_id);
@@ -514,7 +514,6 @@ void S::Cluster::add_stats(const char *s
S::WorkerConnection::WorkerConnection(SchedulerGlobal *global,
int thd_id, int cluster_id) {
- DEBUG_ENTER_METHOD("S::WorkerConnection::WorkerConnection");
S::Cluster *cl = global->clusters[cluster_id];
Configuration *conf = global->conf;
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc revid:mikael.ronstrom@stripped
@@ -88,7 +88,7 @@ void Scheduler_stockholm::init(int my_th
TODO? Start one tx on each data node.
*/
QueryPlan *plan;
- const KeyPrefix *default_prefix = conf.getDefaultPrefix(); // TODO: something
+// const KeyPrefix *default_prefix = conf.getDefaultPrefix(); // TODO: something
for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
const KeyPrefix *prefix = conf.getNextPrefixForCluster(c, NULL);
if(prefix) {
=== modified file 'storage/ndb/memcache/unit/alloc.cc'
--- a/storage/ndb/memcache/unit/alloc.cc revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/unit/alloc.cc revid:mikael.ronstrom@stripped
@@ -28,7 +28,7 @@
#include "all_tests.h"
int run_allocator_test(QueryPlan *, Ndb *, int v) {
- struct request_pipeline *p = get_request_pipeline();
+ struct request_pipeline *p = get_request_pipeline(0);
memory_pool *p1 = pipeline_create_memory_pool(p);
int sz = 13;
=== modified file 'storage/ndb/src/common/util/testMysqlUtils/mysql_utils_test.cpp'
--- a/storage/ndb/src/common/util/testMysqlUtils/mysql_utils_test.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/common/util/testMysqlUtils/mysql_utils_test.cpp revid:mikael.ronstrom@stripped
@@ -23,6 +23,7 @@
#include <stdlib.h> // not using namespaces yet
#include <assert.h> // not using namespaces yet
+#include "dbug_utils.hpp"
#include "decimal_utils.hpp"
#include "CharsetMap.hpp"
@@ -30,6 +31,48 @@
#include "my_sys.h"
#include "mysql.h"
+void test_dbug_utils()
+{
+ const int DBUG_BUF_SIZE = 1024;
+ char buffer[DBUG_BUF_SIZE];
+
+ const char * s0 = "";
+ const char * s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+ assert(!strcmp(s1, s0));
+
+ s1 = dbugExplain(NULL, DBUG_BUF_SIZE);
+ assert(s1 == NULL);
+
+ s1 = dbugExplain(buffer, 0);
+ assert(s1 == NULL);
+
+ s0 = "t";
+ dbugSet(s0);
+ s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+ assert(!strcmp(s1, s0));
+
+ dbugSet(NULL);
+ s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+ assert(!strcmp(s1, s0));
+
+ const char * s2 = "d,jointx:o,/tmp/jointx";
+ dbugPush(s2);
+ s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+ assert(!strcmp(s1, s2));
+
+ dbugPush(NULL);
+ s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+ assert(!strcmp(s1, s2));
+
+ dbugPop();
+ s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+ assert(!strcmp(s1, s0));
+
+ dbugPush(NULL);
+ s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+ assert(!strcmp(s1, s0));
+}
+
void test_decimal(const char *s, int prec, int scale, int expected_rv)
{
char bin_buff[128], str_buff[128];
@@ -51,14 +94,15 @@ void test_decimal(const char *s, int pre
}
}
-
int main()
{
- printf("==== init MySQL lib ====\n");
+ printf("==== BEGIN: MySQL Utils Unit Test ====\n");
+
+ printf("\n==== init MySQL lib, CharsetMap ====\n");
my_init();
CharsetMap::init();
- printf("==== decimal_str2bin() / decimal_bin2str() ====\n");
+ printf("\n==== decimal_str2bin() / decimal_bin2str() ====\n");
test_decimal("100", 3, -1, E_DEC_BAD_SCALE);
test_decimal("3.3", 2, 1, E_DEC_OK);
@@ -128,7 +172,7 @@ int main()
lengths[1] = 32;
CharsetMap::RecodeStatus rr1 = csmap.recode(lengths, utf8_num, latin1_num,
my_word_utf8, result_buff_1);
- printf("Recode Test 1 - UTF-8 to Latin-1: %d %ld %ld \"%s\" => \"%s\" \n",
+ printf("Recode Test 1 - UTF-8 to Latin-1: %d %d %d \"%s\" => \"%s\" \n",
rr1, lengths[0], lengths[1], my_word_utf8, result_buff_1);
assert(rr1 == CharsetMap::RECODE_OK);
assert(lengths[0] == 7);
@@ -140,7 +184,7 @@ int main()
lengths[1] = 32;
CharsetMap::RecodeStatus rr2 = csmap.recode(lengths, latin1_num, utf8_num,
my_word_latin1, result_buff_2);
- printf("Recode Test 2 - Latin-1 to UTF-8: %d %ld %ld \"%s\" => \"%s\" \n",
+ printf("Recode Test 2 - Latin-1 to UTF-8: %d %d %d \"%s\" => \"%s\" \n",
rr2, lengths[0], lengths[1], my_word_latin1, result_buff_2);
assert(rr2 == CharsetMap::RECODE_OK);
assert(lengths[0] == 6);
@@ -152,7 +196,7 @@ int main()
lengths[1] = 4;
CharsetMap::RecodeStatus rr3 = csmap.recode(lengths, latin1_num, utf8_num,
my_word_latin1, result_buff_too_small);
- printf("Recode Test 3 - too-small buffer: %d %ld %ld \"%s\" => \"%s\" \n",
+ printf("Recode Test 3 - too-small buffer: %d %d %d \"%s\" => \"%s\" \n",
rr3, lengths[0], lengths[1], my_word_latin1, result_buff_too_small);
assert(rr3 == CharsetMap::RECODE_BUFF_TOO_SMALL);
assert(lengths[0] == 3);
@@ -201,7 +245,11 @@ int main()
// If there is not at least one of each, then something is probably wrong
assert(nNull && nSingle && nMulti);
+ printf("\n==== DBUG Utilities ====\n");
+ test_dbug_utils();
-
+ printf("\n==== unload CharsetMap ====\n");
CharsetMap::unload();
+
+ printf("\n==== END: MySQL Utils Unit Test ====\n");
}
=== modified file 'storage/ndb/src/common/util/testMysqlUtils/mysql_utils_unit_tests-t'
--- a/storage/ndb/src/common/util/testMysqlUtils/mysql_utils_unit_tests-t revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/common/util/testMysqlUtils/mysql_utils_unit_tests-t revid:mikael.ronstrom@stripped
@@ -39,7 +39,7 @@ echo "running test from directory:" >> "
echo " $script_dir" >> "$log" 2>&1
test_name="test_mysql_utils"
-script_name="$script_dir/$test_mysql_utils.sh"
+script_name="$script_dir/$test_name.sh"
if [ ! -x "$script_name" ];
then
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp revid:mikael.ronstrom@stripped
@@ -27,8 +27,6 @@ LocalProxy::LocalProxy(BlockNumber block
for (i = 0; i < MaxWorkers; i++)
c_worker[i] = 0;
- c_ssIdSeq = 0;
-
c_typeOfStart = NodeState::ST_ILLEGAL_TYPE;
c_masterNodeId = ZNIL;
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp revid:mikael.ronstrom@stripped
@@ -192,11 +192,17 @@ protected:
template <class Ss>
Ss& ssSeize() {
- const Uint32 base = SsIdBase;
- const Uint32 mask = ~base;
- const Uint32 ssId = base | c_ssIdSeq;
- c_ssIdSeq = (c_ssIdSeq + 1) & mask;
- return ssSeize<Ss>(ssId);
+ SsPool<Ss>& sp = Ss::pool(this);
+ Ss* ssptr = ssSearch<Ss>(0);
+ ndbrequire(ssptr != 0);
+ // Use position in array as ssId
+ UintPtr pos = ssptr - sp.m_pool;
+ Uint32 ssId = Uint32(pos) + 1;
+ new (ssptr) Ss;
+ ssptr->m_ssId = ssId;
+ sp.m_usage++;
+ D("ssSeize()" << V(sp.m_usage) << hex << V(ssId) << " " << Ss::name());
+ return *ssptr;
}
template <class Ss>
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp revid:mikael.ronstrom@stripped
@@ -1886,7 +1886,8 @@ private:
} m_local_lcp_state;
// MT LQH
- Uint32 c_fragments_per_node;
+ Uint32 c_fragments_per_node_;
+ Uint32 getFragmentsPerNode();
Uint32 dihGetInstanceKey(FragmentstorePtr tFragPtr) {
ndbrequire(!tFragPtr.isNull());
Uint32 log_part_id = tFragPtr.p->m_log_part_id;
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp revid:mikael.ronstrom@stripped
@@ -319,7 +319,7 @@ Dbdih::Dbdih(Block_context& ctx):
nodeGroupRecord = 0;
nodeRecord = 0;
c_nextNodeGroup = 0;
- c_fragments_per_node = 1;
+ c_fragments_per_node_ = 0;
bzero(c_node_groups, sizeof(c_node_groups));
if (globalData.ndbMtTcThreads == 0)
{
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp revid:mikael.ronstrom@stripped
@@ -1312,14 +1312,13 @@ void Dbdih::execREAD_CONFIG_REQ(Signal*
if (isNdbMtLqh())
{
jam();
- c_fragments_per_node = getLqhWorkers();
+ c_fragments_per_node_ = 0;
// try to get some LQH workers which initially handle no fragments
if (ERROR_INSERTED(7215)) {
- c_fragments_per_node = 1;
+ c_fragments_per_node_ = 1;
+ ndbout_c("Using %u fragments per node", c_fragments_per_node_);
}
}
- ndbout_c("Using %u fragments per node", c_fragments_per_node);
-
ndb_mgm_get_int_parameter(p, CFG_DB_LCP_TRY_LOCK_TIMEOUT,
&c_lcpState.m_lcp_trylock_timeout);
@@ -7545,6 +7544,42 @@ static Uint32 find_min_index(const Uint3
return m;
}
+Uint32
+Dbdih::getFragmentsPerNode()
+{
+ jam();
+ if (c_fragments_per_node_ != 0)
+ {
+ return c_fragments_per_node_;
+ }
+
+ c_fragments_per_node_ = getLqhWorkers();
+ if (c_fragments_per_node_ == 0)
+ c_fragments_per_node_ = 1; // ndbd
+
+ NodeRecordPtr nodePtr;
+ nodePtr.i = cfirstAliveNode;
+ do
+ {
+ jam();
+ ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
+ Uint32 workers = getNodeInfo(nodePtr.i).m_lqh_workers;
+ if (workers == 0) // ndbd
+ workers = 1;
+
+ c_fragments_per_node_ = MIN(workers, c_fragments_per_node_);
+ nodePtr.i = nodePtr.p->nextNode;
+ } while (nodePtr.i != RNIL);
+
+ if (c_fragments_per_node_ == 0)
+ {
+ ndbassert(false);
+ c_fragments_per_node_ = 1;
+ }
+ ndbout_c("Using %u fragments per node", c_fragments_per_node_);
+ return c_fragments_per_node_;
+}
+
void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal)
{
Uint16 node_group_id[MAX_NDB_PARTITIONS];
@@ -7561,11 +7596,9 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ
const Uint32 flags = req->requestInfo;
Uint32 err = 0;
- const Uint32 defaultFragments =
- c_fragments_per_node * cnoOfNodeGroups * cnoReplicas;
- const Uint32 maxFragments =
- MAX_FRAG_PER_LQH * (getLqhWorkers() ? getLqhWorkers() : 1) *
- cnoOfNodeGroups * cnoReplicas;
+ const Uint32 defaultFragments =
+ getFragmentsPerNode() * cnoOfNodeGroups * cnoReplicas;
+ const Uint32 maxFragments = MAX_FRAG_PER_LQH * defaultFragments;
do {
NodeGroupRecordPtr NGPtr;
@@ -16190,8 +16223,8 @@ void Dbdih::execCHECKNODEGROUPSREQ(Signa
case CheckNodeGroups::GetDefaultFragments:
jam();
ok = true;
- sd->output = (cnoOfNodeGroups + sd->extraNodeGroups)
- * c_fragments_per_node * cnoReplicas;
+ sd->output = (cnoOfNodeGroups + sd->extraNodeGroups)
+ * getFragmentsPerNode() * cnoReplicas;
break;
}
ndbrequire(ok);
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp revid:mikael.ronstrom@stripped
@@ -4397,6 +4397,7 @@ MgmtSrvr::request_events(NdbNodeBitmask
Vector<SimpleSignal>& events)
{
int nodes_counter[MAX_NDB_NODES];
+ NdbNodeBitmask save = nodes;
SignalSender ss(theFacade);
ss.lock();
@@ -4447,6 +4448,10 @@ MgmtSrvr::request_events(NdbNodeBitmask
if (!nodes.get(nodeid))
{
// The reporting node was not expected
+#ifndef NDEBUG
+ ndbout_c("nodeid: %u", nodeid);
+ ndbout_c("save: %s", BaseString::getPrettyText(save).c_str());
+#endif
assert(false);
return false;
}
=== modified file 'storage/ndb/src/ndbapi/NdbIndexStat.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStat.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbapi/NdbIndexStat.cpp revid:mikael.ronstrom@stripped
@@ -369,6 +369,7 @@ NdbIndexStat::clean_cache()
void
NdbIndexStat::get_cache_info(CacheInfo& info, CacheType type) const
{
+ NdbMutex_Lock(m_impl.m_query_mutex);
const NdbIndexStatImpl::Cache* c = 0;
switch (type) {
case CacheBuild:
@@ -387,6 +388,7 @@ NdbIndexStat::get_cache_info(CacheInfo&
info.m_totalBytes = 0;
info.m_save_time = 0;
info.m_sort_time = 0;
+ info.m_ref_count = 0;
while (c != 0)
{
info.m_count += 1;
@@ -395,10 +397,12 @@ NdbIndexStat::get_cache_info(CacheInfo&
info.m_totalBytes += c->m_keyBytes + c->m_valueBytes + c->m_addrBytes;
info.m_save_time += c->m_save_time;
info.m_sort_time += c->m_sort_time;
+ info.m_ref_count += c->m_ref_count;
c = c->m_nextClean;
}
// build and query cache have at most one instance
require(type == CacheClean || info.m_count <= 1);
+ NdbMutex_Unlock(m_impl.m_query_mutex);
}
// read
=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp revid:mikael.ronstrom@stripped
@@ -23,6 +23,7 @@
#include <NdbSqlUtil.hpp>
#include <NdbRecord.hpp>
#include <NdbEventOperation.hpp>
+#include <NdbSleep.h>
#include "NdbIndexStatImpl.hpp"
#undef min
@@ -1460,6 +1461,8 @@ NdbIndexStatImpl::Cache::Cache()
// performance
m_save_time = 0;
m_sort_time = 0;
+ // in use by query_stat
+ m_ref_count = 0;
}
int
@@ -2002,6 +2005,21 @@ NdbIndexStatImpl::convert_range(Range& r
return -1;
}
}
+
+#ifdef VM_TRACE
+ {
+ const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_RANGE_ERROR", (char*)0, 0);
+ if (p != 0 && strchr("1Y", p[0]) != 0)
+ {
+ if (rand() % 10 == 0)
+ {
+ setError(InternalError, __LINE__, NdbIndexStat::InternalError);
+ return -1;
+ }
+ }
+ }
+#endif
+
return 0;
}
@@ -2033,23 +2051,41 @@ int
NdbIndexStatImpl::query_stat(const Range& range, Stat& stat)
{
NdbMutex_Lock(m_query_mutex);
- const Cache* cacheTmp = m_cacheQuery;
- NdbMutex_Unlock(m_query_mutex);
-
- if (unlikely(cacheTmp == 0))
+ if (unlikely(m_cacheQuery == 0))
{
+ NdbMutex_Unlock(m_query_mutex);
setError(UsageError, __LINE__);
return -1;
}
- const Cache& c = *cacheTmp;
+ const Cache& c = *m_cacheQuery;
if (unlikely(!c.m_valid))
{
+ NdbMutex_Unlock(m_query_mutex);
setError(InvalidCache, __LINE__);
return -1;
}
+ c.m_ref_count++;
+ NdbMutex_Unlock(m_query_mutex);
+
+#ifdef VM_TRACE
+ {
+ const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_SLOW_QUERY", (char*)0, 0);
+ if (p != 0 && strchr("1Y", p[0]) != 0)
+ {
+ int ms = 1 + rand() % 20;
+ NdbSleep_MilliSleep(ms);
+ }
+ }
+#endif
+ // clients run these in parallel
query_interpolate(c, range, stat);
query_normalize(c, stat.m_value);
+
+ NdbMutex_Lock(m_query_mutex);
+ assert(c.m_ref_count != 0);
+ c.m_ref_count--;
+ NdbMutex_Unlock(m_query_mutex);
return 0;
}
=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp revid:mikael.ronstrom@stripped
@@ -70,7 +70,7 @@ public:
Cache* m_cacheBuild;
Cache* m_cacheQuery;
Cache* m_cacheClean;
- // mutex for query cache switch, memory barrier would do
+ // mutex for query cache switch and reference count
NdbMutex* m_query_mutex;
NdbEventOperation* m_eventOp;
Mem* m_mem_handler;
@@ -185,6 +185,8 @@ public:
// performance
mutable Uint64 m_save_time;
mutable Uint64 m_sort_time;
+ // in use by query_stat
+ mutable uint m_ref_count;
Cache();
// pos is index < sampleCount, addr is offset in keyArray
uint get_keyaddr(uint pos) const;
=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbapi/ndberror.c revid:mikael.ronstrom@stripped
@@ -552,6 +552,11 @@ ErrorBundle ErrorCodes[] = {
{ 4718, DMEC, IE, "Index stats samples data or memory cache is invalid" },
{ 4719, DMEC, IE, "Index stats internal error" },
{ 4720, DMEC, AE, "Index stats sys tables " NDB_INDEX_STAT_PREFIX " partly missing or invalid" },
+ { 4721, DMEC, IE, "Mysqld: index stats thread not open for requests" },
+ { 4722, DMEC, IE, "Mysqld: index stats entry unexpectedly not found" },
+ { 4723, DMEC, AE, "Mysqld: index stats request ignored due to recent error" },
+ { 4724, DMEC, AE, "Mysqld: index stats request aborted by stats thread" },
+ { 4725, DMEC, AE, "Index stats were deleted by another process" },
/**
* Still uncategorized
=== modified file 'storage/ndb/src/ndbjtie/MysqlUtilsWrapper.hpp'
--- a/storage/ndb/src/ndbjtie/MysqlUtilsWrapper.hpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbjtie/MysqlUtilsWrapper.hpp revid:mikael.ronstrom@stripped
@@ -24,6 +24,7 @@
// API to wrap
#include "CharsetMap.hpp"
#include "decimal_utils.hpp"
+#include "dbug_utils.hpp"
struct MysqlUtilsWrapper {
@@ -100,7 +101,42 @@ struct MysqlUtilsWrapper {
{
return ::decimal_bin2str(p0, p1, p2, p3, p4, p5);
}
-
+
+ static void
+ dbugPush
+ ( const char * p0 )
+ {
+ ::dbugPush(p0);
+ }
+
+ static void
+ dbugPop
+ ()
+ {
+ ::dbugPop();
+ }
+
+ static void
+ dbugSet
+ ( const char * p0 )
+ {
+ ::dbugSet(p0);
+ }
+
+ static const char *
+ dbugExplain
+ ( char * p0, int p1 )
+ {
+ return ::dbugExplain(p0, p1);
+ }
+
+ static void
+ dbugPrint
+ ( const char * p0, const char * p1 )
+ {
+ ::dbugPrint(p0, p1);
+ }
+
// ---------------------------------------------------------------------------
};
=== modified file 'storage/ndb/src/ndbjtie/com/mysql/ndbjtie/mysql/Utils.java'
--- a/storage/ndb/src/ndbjtie/com/mysql/ndbjtie/mysql/Utils.java revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbjtie/com/mysql/ndbjtie/mysql/Utils.java revid:mikael.ronstrom@stripped
@@ -1,20 +1,20 @@
/*
- Copyright 2010 Sun Microsystems, Inc.
- All rights reserved. Use is subject to license terms.
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; version 2 of the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
+ * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-*/
/*
* Utils.java
*/
@@ -36,4 +36,9 @@ public class Utils
//static public final int E_DEC_FATAL_ERROR = 30; // MMM not used at this time?
static public final native int decimal_str2bin(ByteBuffer/*_const char *_*/ str, int str_len, int prec, int scale, ByteBuffer/*_void *_*/ dest, int buf_len);
static public final native int decimal_bin2str(ByteBuffer/*_const void *_*/ bin, int bin_len, int prec, int scale, ByteBuffer/*_char *_*/ dest, int buf_len);
+ static public final native void dbugPush(String state);
+ static public final native void dbugPop();
+ static public final native void dbugSet(String state);
+ static public final native String dbugExplain(ByteBuffer buffer, int length);
+ static public final native void dbugPrint(String keyword, String message);
}
=== modified file 'storage/ndb/src/ndbjtie/mysql_utils_jtie.hpp'
--- a/storage/ndb/src/ndbjtie/mysql_utils_jtie.hpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbjtie/mysql_utils_jtie.hpp revid:mikael.ronstrom@stripped
@@ -243,6 +243,86 @@ Java_com_mysql_ndbjtie_mysql_Utils_decim
#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
}
+/*
+ * Class: com_mysql_ndbjtie_mysql_Utils
+ * Method: dbugPush
+ * Signature: (Ljava/lang/String;)V
+ */
+JNIEXPORT void JNICALL
+Java_com_mysql_ndbjtie_mysql_Utils_dbugPush(JNIEnv * env, jclass cls, jstring p0)
+{
+ TRACE("void Java_com_mysql_ndbjtie_mysql_Utils_dbugPush(JNIEnv *, jclass, jstring)");
+#ifndef NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+ gcall_fv< ttrait_char_cp_jutf8null, &::dbugPush >(env, cls, p0);
+#else
+ gcall_fv< ttrait_char_cp_jutf8null, &MysqlUtilsWrapper::dbugPush >(env, cls, p0);
+#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+}
+
+/*
+ * Class: com_mysql_ndbjtie_mysql_Utils
+ * Method: dbugPop
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL
+Java_com_mysql_ndbjtie_mysql_Utils_dbugPop(JNIEnv * env, jclass cls)
+{
+ TRACE("void Java_com_mysql_ndbjtie_mysql_Utils_dbugPop(JNIEnv *, jclass)");
+#ifndef NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+ gcall_fv< &::dbugPop >(env, cls);
+#else
+ gcall_fv< &MysqlUtilsWrapper::dbugPop >(env, cls);
+#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+}
+
+/*
+ * Class: com_mysql_ndbjtie_mysql_Utils
+ * Method: dbugSet
+ * Signature: (Ljava/lang/String;)V
+ */
+JNIEXPORT void JNICALL
+Java_com_mysql_ndbjtie_mysql_Utils_dbugSet(JNIEnv * env, jclass cls, jstring p0)
+{
+ TRACE("void Java_com_mysql_ndbjtie_mysql_Utils_dbugSet(JNIEnv *, jclass, jstring)");
+#ifndef NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+ gcall_fv< ttrait_char_cp_jutf8null, &::dbugSet >(env, cls, p0);
+#else
+ gcall_fv< ttrait_char_cp_jutf8null, &MysqlUtilsWrapper::dbugSet >(env, cls, p0);
+#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+}
+
+/*
+ * Class: com_mysql_ndbjtie_mysql_Utils
+ * Method: dbugExplain
+ * Signature: (Ljava/nio/ByteBuffer;I)Ljava/lang/String;
+ */
+JNIEXPORT jstring JNICALL
+Java_com_mysql_ndbjtie_mysql_Utils_dbugExplain(JNIEnv * env, jclass cls, jobject p0, jint p1)
+{
+ TRACE("jstring Java_com_mysql_ndbjtie_mysql_Utils_dbugExplain(JNIEnv *, jclass, jobject, jint)");
+#ifndef NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+ return gcall_fr< ttrait_char_cp_jutf8null, ttrait_char_0p_bb, ttrait_int, &::dbugExplain >(env, cls, p0, p1);
+#else
+ return gcall_fr< ttrait_char_cp_jutf8null, ttrait_char_0p_bb, ttrait_int, &MysqlUtilsWrapper::dbugExplain >(env, cls, p0, p1);
+#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+}
+
+/*
+ * Class: com_mysql_ndbjtie_mysql_Utils
+ * Method: dbugPrint
+ * Signature: (Ljava/lang/String;Ljava/lang/String;)V
+ */
+JNIEXPORT void JNICALL
+Java_com_mysql_ndbjtie_mysql_Utils_dbugPrint(JNIEnv * env, jclass cls, jstring p0, jstring p1)
+{
+ TRACE("void Java_com_mysql_ndbjtie_mysql_Utils_dbugPrint(JNIEnv *, jclass, jstring, jstring)");
+#ifndef NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+ gcall_fv< ttrait_char_cp_jutf8null, ttrait_char_cp_jutf8null, &::dbugPrint >(env, cls, p0, p1);
+#else
+ gcall_fv< ttrait_char_cp_jutf8null, ttrait_char_cp_jutf8null, &MysqlUtilsWrapper::dbugPrint >(env, cls, p0, p1);
+#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+}
+
// ---------------------------------------------------------------------------
} // extern "C"
=== modified file 'storage/ndb/test/include/NDBT_Test.hpp'
--- a/storage/ndb/test/include/NDBT_Test.hpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/include/NDBT_Test.hpp revid:mikael.ronstrom@stripped
@@ -100,6 +100,13 @@ public:
*/
void sync_down(const char * key);
void sync_up_and_wait(const char * key, Uint32 count = 0);
+
+ /**
+ * safety for slow machines...
+ * 0 means no safety
+ */
+ bool closeToTimeout(int safety_percent = 0);
+
private:
friend class NDBT_Step;
friend class NDBT_TestSuite;
@@ -120,6 +127,9 @@ private:
Properties props;
NdbMutex* propertyMutexPtr;
NdbCondition* propertyCondPtr;
+
+ int m_env_timeout;
+ Uint64 m_test_start_time;
};
typedef int (NDBT_TESTFUNC)(NDBT_Context*, NDBT_Step*);
=== modified file 'storage/ndb/test/include/NdbRestarts.hpp'
--- a/storage/ndb/test/include/NdbRestarts.hpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/include/NdbRestarts.hpp revid:mikael.ronstrom@stripped
@@ -56,7 +56,8 @@ public:
};
struct NdbRestart {
- typedef int (restartFunc)(NDBT_Context*, NdbRestarter&, const NdbRestart*);
+ typedef int (restartFunc)(NDBT_Context*, NdbRestarter&, const NdbRestart*,
+ int safety);
NdbRestart(const char* _name,
NdbRestartType _type,
@@ -74,13 +75,16 @@ public:
int getNumRestarts();
- int executeRestart(NDBT_Context*, int _num, unsigned int _to = 120);
- int executeRestart(NDBT_Context*, const char* _name, unsigned int _to = 120);
+ int executeRestart(NDBT_Context*, int _num,
+ unsigned int _to = 120, int safety = 0);
+ int executeRestart(NDBT_Context*, const char* _name,
+ unsigned int _to = 120, int safety = 0);
void listRestarts();
void listRestarts(NdbRestartType _type);
private:
- int executeRestart(NDBT_Context*, const NdbRestart*, unsigned int _timeout);
+ int executeRestart(NDBT_Context*, const NdbRestart*,
+ unsigned int _timeout, int safety);
struct NdbErrorInsert {
NdbErrorInsert(const char* _name,
=== modified file 'storage/ndb/test/ndbapi/testDict.cpp'
--- a/storage/ndb/test/ndbapi/testDict.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/ndbapi/testDict.cpp revid:mikael.ronstrom@stripped
@@ -9007,9 +9007,10 @@ runIndexStatCreate(NDBT_Context* ctx, ND
* OK
*/
}
- else if (! (is.getNdbError().code == 721 ||
- is.getNdbError().code == 4244 ||
- is.getNdbError().code == 4009)) // no connection
+ else if (! (is.getNdbError().code == 701 || // timeout
+ is.getNdbError().code == 721 || // already exists
+ is.getNdbError().code == 4244 || // already exists
+ is.getNdbError().code == 4009)) // no connection
{
ndbout << is.getNdbError() << endl;
return NDBT_FAILED;
=== modified file 'storage/ndb/test/ndbapi/testNodeRestart.cpp'
--- a/storage/ndb/test/ndbapi/testNodeRestart.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/ndbapi/testNodeRestart.cpp revid:mikael.ronstrom@stripped
@@ -450,13 +450,20 @@ int runRestarts(NDBT_Context* ctx, NDBT_
int i = 0;
int timeout = 240;
- while(i<loops && result != NDBT_FAILED && !ctx->isTestStopped()){
-
- if(restarts.executeRestart(ctx, pCase->getName(), timeout) != 0){
+ while (i<loops && result != NDBT_FAILED && !ctx->isTestStopped())
+ {
+ int safety = 0;
+ if (i > 0)
+ safety = 15;
+
+ if (ctx->closeToTimeout(safety))
+ break;
+
+ if(restarts.executeRestart(ctx, pCase->getName(), timeout, safety) != 0){
g_err << "Failed to executeRestart(" <<pCase->getName() <<")" << endl;
result = NDBT_FAILED;
break;
- }
+ }
i++;
}
ctx->stopTest();
=== modified file 'storage/ndb/test/run-test/atrt.hpp'
--- a/storage/ndb/test/run-test/atrt.hpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/atrt.hpp revid:mikael.ronstrom@stripped
@@ -118,9 +118,15 @@ struct atrt_testcase
bool m_report;
bool m_run_all;
time_t m_max_time;
- BaseString m_command;
- BaseString m_args;
BaseString m_name;
+ BaseString m_mysqld_options;
+
+ struct Command
+ {
+ atrt_process::Type m_cmd_type;
+ BaseString m_exe;
+ BaseString m_args;
+ } m_cmd; // Todo make array of these...
};
extern Logger g_logger;
@@ -155,6 +161,9 @@ bool do_command(atrt_config& config);
bool start_process(atrt_process & proc);
bool stop_process(atrt_process & proc);
+bool connect_mysqld(atrt_process & proc);
+bool disconnect_mysqld(atrt_process & proc);
+
/**
* check configuration if any changes has been
* done for the duration of the latest running test
=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt revid:mikael.ronstrom@stripped
@@ -12,7 +12,7 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-max-time: 900
+max-time: 1800
cmd: testIndex
args: -n DeferredError
@@ -20,7 +20,7 @@ max-time: 900
cmd: testIndex
args: -n DeferredMixedLoad T1 T6 T13
-max-time: 1800
+max-time: 2000
cmd: testIndex
args: -n DeferredMixedLoadError T1 T6 T13
@@ -28,7 +28,7 @@ max-time: 900
cmd: testIndex
args: -n NF_DeferredMixed T1 T6 T13
-max-time: 900
+max-time: 1800
cmd: testIndex
args: -n NF_Mixed T1 T6 T13
@@ -1751,7 +1751,7 @@ max-time: 300
cmd: testBasic
args: -n UnlockUpdateBatch T3
-max-time: 300
+max-time: 600
cmd: testNodeRestart
args: -n MixReadUnlockRestart T1
=== modified file 'storage/ndb/test/run-test/daily-perf-tests.txt'
--- a/storage/ndb/test/run-test/daily-perf-tests.txt revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/daily-perf-tests.txt revid:mikael.ronstrom@stripped
@@ -79,62 +79,72 @@ type: bench
# sql
max-time: 600
-client: ndb-sql-perf-create-table.sh
-args: t1
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-create-table.sh t1
+cmd-type: mysql
max-time: 600
-client: ndb-sql-perf-select.sh
-args: t1 1 64
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-select.sh t1 1 64
mysqld: --ndb-cluster-connection-pool=1
type: bench
+cmd-type: mysql
max-time: 600
-client: ndb-sql-perf-select.sh
-args: t1 1 64
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-select.sh t1 1 64
mysqld: --ndb-cluster-connection-pool=4
type: bench
+cmd-type: mysql
max-time: 600
-client: ndb-sql-perf-update.sh
-args: t1 1 64
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-update.sh t1 1 64
mysqld: --ndb-cluster-connection-pool=1
type: bench
+cmd-type: mysql
max-time: 600
-client: ndb-sql-perf-update.sh
-args: t1 1 64
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-update.sh t1 1 64
mysqld: --ndb-cluster-connection-pool=4
type: bench
+cmd-type: mysql
max-time: 600
-client: ndb-sql-perf-drop-table.sh
-args: t1
-mysqld:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-drop-table.sh t1
+cmd-type: mysql
# sql join
max-time: 600
-client: ndb-sql-perf-load-tpcw.sh
-args:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-load-tpcw.sh
+cmd-type: mysql
max-time: 600
-client: ndb-sql-perf-tpcw-getBestSeller.sh
-args:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-tpcw-getBestSeller.sh
+cmd-type: mysql
type: bench
max-time: 600
-client: ndb-sql-perf-drop-tpcw.sh
-args:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-drop-tpcw.sh
+cmd-type: mysql
max-time: 600
-client: ndb-sql-perf-load-music-store.sh
-args:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-load-music-store.sh
+cmd-type: mysql
max-time: 600
-client: ndb-sql-perf-select-music-store.sh
-args:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-select-music-store.sh
+cmd-type: mysql
type: bench
max-time: 600
-client: ndb-sql-perf-drop-music-store.sh
-args:
-
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-drop-music-store.sh
+cmd-type: mysql
=== modified file 'storage/ndb/test/run-test/db.cpp'
--- a/storage/ndb/test/run-test/db.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/db.cpp revid:mikael.ronstrom@stripped
@@ -18,7 +18,6 @@
#include "atrt.hpp"
#include <NdbSleep.h>
-static bool connect_mysqld(atrt_process* proc);
static bool populate_db(atrt_config&, atrt_process*);
static bool setup_repl(atrt_config&);
@@ -132,7 +131,7 @@ setup_db(atrt_config& config)
atrt_process * proc = config.m_processes[i];
if (proc->m_type == atrt_process::AP_MYSQLD)
{
- if (!connect_mysqld(config.m_processes[i]))
+ if (!connect_mysqld(* config.m_processes[i]))
return false;
}
}
@@ -177,16 +176,16 @@ find(atrt_process* proc, const char * ke
}
bool
-connect_mysqld(atrt_process* proc)
+connect_mysqld(atrt_process& proc)
{
- if ( !mysql_init(&proc->m_mysql))
+ if ( !mysql_init(&proc.m_mysql))
{
g_logger.error("Failed to init mysql");
return false;
}
- const char * port = find(proc, "--port=");
- const char * socket = find(proc, "--socket=");
+ const char * port = find(&proc, "--port=");
+ const char * socket = find(&proc, "--socket=");
if (port == 0 && socket == 0)
{
g_logger.error("Neither socket nor port specified...cant connect to mysql");
@@ -198,10 +197,10 @@ connect_mysqld(atrt_process* proc)
if (port)
{
mysql_protocol_type val = MYSQL_PROTOCOL_TCP;
- mysql_options(&proc->m_mysql, MYSQL_OPT_PROTOCOL, &val);
+ mysql_options(&proc.m_mysql, MYSQL_OPT_PROTOCOL, &val);
}
- if (mysql_real_connect(&proc->m_mysql,
- proc->m_host->m_hostname.c_str(),
+ if (mysql_real_connect(&proc.m_mysql,
+ proc.m_host->m_hostname.c_str(),
"root", "", "test",
port ? atoi(port) : 0,
socket,
@@ -210,17 +209,24 @@ connect_mysqld(atrt_process* proc)
return true;
}
g_logger.info("Retrying connect to %s:%u 3s",
- proc->m_host->m_hostname.c_str(),atoi(port));
- NdbSleep_SecSleep(3);
+ proc.m_host->m_hostname.c_str(),atoi(port));
+ NdbSleep_SecSleep(3);
}
-
+
g_logger.error("Failed to connect to mysqld err: >%s< >%s:%u:%s<",
- mysql_error(&proc->m_mysql),
- proc->m_host->m_hostname.c_str(), port ? atoi(port) : 0,
+ mysql_error(&proc.m_mysql),
+ proc.m_host->m_hostname.c_str(), port ? atoi(port) : 0,
socket ? socket : "<null>");
return false;
}
+bool
+disconnect_mysqld(atrt_process& proc)
+{
+ mysql_close(&proc.m_mysql);
+ return true;
+}
+
void
BINDI(MYSQL_BIND& bind, int * i)
{
=== modified file 'storage/ndb/test/run-test/main.cpp'
--- a/storage/ndb/test/run-test/main.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/main.cpp revid:mikael.ronstrom@stripped
@@ -422,17 +422,17 @@ main(int argc, char ** argv)
if(!read_test_case(g_test_case_file, test_case, lineno))
goto end;
- g_logger.info("#%d - %s %s",
+ g_logger.info("#%d - %s",
test_no,
- test_case.m_command.c_str(), test_case.m_args.c_str());
-
+ test_case.m_name.c_str());
+
// Assign processes to programs
- if(!setup_test_case(g_config, test_case))
+ if (!setup_test_case(g_config, test_case))
{
g_logger.critical("Failed to setup test case");
goto end;
}
-
+
if(!start_processes(g_config, p_clients))
{
g_logger.critical("Failed to start client processes");
@@ -1091,6 +1091,11 @@ stop_process(atrt_process & proc){
return true;
}
+ if (proc.m_type == atrt_process::AP_MYSQLD)
+ {
+ disconnect_mysqld(proc);
+ }
+
{
Properties reply;
if(proc.m_host->m_cpcd->stop_process(proc.m_proc.m_id, reply) != 0){
@@ -1248,24 +1253,24 @@ read_test_case(FILE * file, atrt_testcas
tmp.trim(" \t\n\r");
Vector<BaseString> split;
tmp.split(split, " ", 2);
- tc.m_command = split[0];
+ tc.m_cmd.m_exe = split[0];
if(split.size() == 2)
- tc.m_args = split[1];
+ tc.m_cmd.m_args = split[1];
else
- tc.m_args = "";
+ tc.m_cmd.m_args = "";
tc.m_max_time = 60000;
return true;
}
return false;
}
- if(!p.get("cmd", tc.m_command)){
+ if(!p.get("cmd", tc.m_cmd.m_exe)){
g_logger.critical("Invalid test file: cmd is missing near line: %d", line);
return false;
}
- if(!p.get("args", tc.m_args))
- tc.m_args = "";
+ if(!p.get("args", tc.m_cmd.m_args))
+ tc.m_cmd.m_args = "";
const char * mt = 0;
if(!p.get("max-time", &mt))
@@ -1283,17 +1288,33 @@ read_test_case(FILE * file, atrt_testcas
else
tc.m_run_all= false;
+ const char * str;
+ if (p.get("mysqld", &str))
+ {
+ tc.m_mysqld_options.assign(str);
+ }
+ else
+ {
+ tc.m_mysqld_options.assign("");
+ }
+
+ tc.m_cmd.m_cmd_type = atrt_process::AP_NDB_API;
+ if (p.get("cmd-type", &str) && strcmp(str, "mysql") == 0)
+ {
+ tc.m_cmd.m_cmd_type = atrt_process::AP_CLIENT;
+ }
+
if (!p.get("name", &mt))
{
tc.m_name.assfmt("%s %s",
- tc.m_command.c_str(),
- tc.m_args.c_str());
+ tc.m_cmd.m_exe.c_str(),
+ tc.m_cmd.m_args.c_str());
}
else
{
tc.m_name.assign(mt);
}
-
+
return true;
}
@@ -1306,46 +1327,87 @@ setup_test_case(atrt_config& config, con
return false;
}
- size_t i = 0;
- for(; i<config.m_processes.size(); i++)
+ for (size_t i = 0; i<config.m_processes.size(); i++)
{
- atrt_process & proc = *config.m_processes[i];
- if(proc.m_type == atrt_process::AP_NDB_API ||
- proc.m_type == atrt_process::AP_CLIENT)
+ atrt_process & proc = *config.m_processes[i];
+ if (proc.m_type == atrt_process::AP_NDB_API ||
+ proc.m_type == atrt_process::AP_CLIENT)
{
- BaseString cmd;
- char * p = find_bin_path(tc.m_command.c_str());
- if (p == 0)
- {
- g_logger.critical("Failed to locate '%s'", tc.m_command.c_str());
- return false;
- }
- cmd.assign(p);
- free(p);
+ proc.m_proc.m_path.assign("");
+ proc.m_proc.m_args.assign("");
+ }
+ }
+ BaseString cmd;
+ char * p = find_bin_path(tc.m_cmd.m_exe.c_str());
+ if (p == 0)
+ {
+ g_logger.critical("Failed to locate '%s'", tc.m_cmd.m_exe.c_str());
+ return false;
+ }
+ cmd.assign(p);
+ free(p);
+
+ for (size_t i = 0; i<config.m_processes.size(); i++)
+ {
+ atrt_process & proc = *config.m_processes[i];
+ if (proc.m_type == tc.m_cmd.m_cmd_type &&
+ proc.m_proc.m_path == "")
+ {
+ proc.m_save.m_proc = proc.m_proc;
+ proc.m_save.m_saved = true;
+
+ proc.m_proc.m_env.appfmt(" ATRT_TIMEOUT=%ld", tc.m_max_time);
if (0) // valgrind
{
proc.m_proc.m_path = "/usr/bin/valgrind";
- proc.m_proc.m_args.appfmt("%s %s", cmd.c_str(), tc.m_args.c_str());
+ proc.m_proc.m_args.appfmt("%s %s", cmd.c_str(),
+ tc.m_cmd.m_args.c_str());
}
else
{
proc.m_proc.m_path = cmd;
- proc.m_proc.m_args.assign(tc.m_args);
+ proc.m_proc.m_args.assign(tc.m_cmd.m_args.c_str());
}
- if(!tc.m_run_all)
+ if (!tc.m_run_all)
break;
}
}
- for(i++; i<config.m_processes.size(); i++){
- atrt_process & proc = *config.m_processes[i];
- if(proc.m_type == atrt_process::AP_NDB_API ||
- proc.m_type == atrt_process::AP_CLIENT)
+
+ if (tc.m_mysqld_options != "")
+ {
+ g_logger.info("restarting mysqld with extra options: %s",
+ tc.m_mysqld_options.c_str());
+
+ /**
+ * Apply testcase specific mysqld options
+ */
+ for (size_t i = 0; i<config.m_processes.size(); i++)
{
- proc.m_proc.m_path.assign("");
- proc.m_proc.m_args.assign("");
+ atrt_process & proc = *config.m_processes[i];
+ if (proc.m_type == atrt_process::AP_MYSQLD)
+ {
+ proc.m_save.m_proc = proc.m_proc;
+ proc.m_save.m_saved = true;
+ proc.m_proc.m_args.appfmt(" %s", tc.m_mysqld_options.c_str());
+ if (!stop_process(proc))
+ {
+ return false;
+ }
+
+ if (!start_process(proc))
+ {
+ return false;
+ }
+
+ if (!connect_mysqld(proc))
+ {
+ return false;
+ }
+ }
}
}
+
return true;
}
@@ -1562,10 +1624,14 @@ reset_config(atrt_config & config)
atrt_process & proc = *config.m_processes[i];
if (proc.m_save.m_saved)
{
- if (!stop_process(proc))
- return false;
-
- changed = true;
+ if (proc.m_proc.m_status == "running")
+ {
+ if (!stop_process(proc))
+ return false;
+
+ changed = true;
+ }
+
proc.m_save.m_saved = false;
proc.m_proc = proc.m_save.m_proc;
proc.m_proc.m_id = -1;
=== modified file 'storage/ndb/test/run-test/test-tests.txt'
--- a/storage/ndb/test/run-test/test-tests.txt revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/test-tests.txt revid:mikael.ronstrom@stripped
@@ -1,4 +1,17 @@
max-time: 600
cmd: testBasic
args: -n PkRead T1
+type: bench
+
+max-time: 60
+cmd: mysql
+args: -u root information_schema -e show\ tables
+cmd-type: mysql
+type: bench
+mysqld: --ndb-cluster-connection-pool=4
+
+max-time: 6000
+cmd: testBasic
+args: -n PkRead T1
+type: bench
=== modified file 'storage/ndb/test/src/NDBT_Test.cpp'
--- a/storage/ndb/test/src/NDBT_Test.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/src/NDBT_Test.cpp revid:mikael.ronstrom@stripped
@@ -20,6 +20,7 @@
#include "NDBT.hpp"
#include "NDBT_Test.hpp"
+#include <portlib/NdbEnv.h>
static int opt_stop_on_error = 0;
@@ -34,9 +35,10 @@ NDBT_Context::NDBT_Context(Ndb_cluster_c
stopped = false;
propertyMutexPtr = NdbMutex_Create();
propertyCondPtr = NdbCondition_Create();
+ m_env_timeout = 0;
+ m_test_start_time = NdbTick_CurrentMillisecond();
}
-
NDBT_Context::~NDBT_Context(){
NdbCondition_Destroy(propertyCondPtr);
NdbMutex_Destroy(propertyMutexPtr);
@@ -318,7 +320,6 @@ NDBT_Step::tearDown(){
m_ndb = NULL;
}
-
Ndb* NDBT_Step::getNdb() const {
assert(m_ndb != NULL);
return m_ndb;
@@ -970,8 +971,8 @@ NDBT_TestSuite::executeOneCtx(Ndb_cluste
if (opt_stop_on_error != 0 && numTestsFail > 0)
break;
- if (_testname != NULL &&
- strcasecmp(tests[t]->getName(), _testname) != 0)
+ if (_testname != NULL &&
+ strcasecmp(tests[t]->getName(), _testname) != 0)
continue;
tests[t]->initBeforeTest();
@@ -1648,6 +1649,42 @@ NDBT_Context::sync_up_and_wait(const cha
getPropertyWait(key, (unsigned)0);
}
+bool
+NDBT_Context::closeToTimeout(int safety)
+{
+ if (safety == 0)
+ return false;
+
+ if (m_env_timeout == 0)
+ {
+ char buf[1024];
+ const char * p = NdbEnv_GetEnv("ATRT_TIMEOUT", buf, sizeof(buf));
+ if (p)
+ {
+ m_env_timeout = atoi(p);
+ ndbout_c("FOUND ATRT_TIMEOUT: %d", m_env_timeout);
+ }
+ else
+ {
+ m_env_timeout = -1;
+ }
+ }
+
+ if (m_env_timeout < 0)
+ return false;
+
+ Uint64 to = (1000 * m_env_timeout * (100 - safety)) / 100;
+ Uint64 now = NdbTick_CurrentMillisecond();
+ if (now >= m_test_start_time + to)
+ {
+ ndbout_c("closeToTimeout(%d) => true env(timeout): %d",
+ safety, m_env_timeout);
+ return true;
+ }
+
+ return false;
+}
+
template class Vector<NDBT_TestCase*>;
template class Vector<NDBT_TestCaseResult*>;
template class Vector<NDBT_Step*>;
=== modified file 'storage/ndb/test/src/NdbRestarts.cpp'
--- a/storage/ndb/test/src/NdbRestarts.cpp revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/src/NdbRestarts.cpp revid:mikael.ronstrom@stripped
@@ -24,7 +24,7 @@
#include <NdbEnv.h>
#include <NDBT_Test.hpp>
-#define F_ARGS NDBT_Context* ctx, NdbRestarter& _restarter, const NdbRestarts::NdbRestart* _restart
+#define F_ARGS NDBT_Context* ctx, NdbRestarter& _restarter, const NdbRestarts::NdbRestart* _restart, int
int restartRandomNodeGraceful(F_ARGS);
int restartRandomNodeAbort(F_ARGS);
@@ -242,7 +242,8 @@ const NdbRestarts::NdbRestart* NdbRestar
int NdbRestarts::executeRestart(NDBT_Context* ctx,
const NdbRestarts::NdbRestart* _restart,
- unsigned int _timeout){
+ unsigned int _timeout,
+ int safety){
// Check that there are enough nodes in the cluster
// for this test
NdbRestarter restarter;
@@ -258,7 +259,7 @@ int NdbRestarts::executeRestart(NDBT_Con
return NDBT_FAILED;
}
- int res = _restart->m_restartFunc(ctx, restarter, _restart);
+ int res = _restart->m_restartFunc(ctx, restarter, _restart, safety);
// Sleep a little waiting for nodes to react to command
NdbSleep_SecSleep(2);
@@ -280,23 +281,25 @@ int NdbRestarts::executeRestart(NDBT_Con
int NdbRestarts::executeRestart(NDBT_Context* ctx,
int _num,
- unsigned int _timeout){
+ unsigned int _timeout,
+ int safety){
const NdbRestarts::NdbRestart* r = getRestart(_num);
if (r == NULL)
return NDBT_FAILED;
- int res = executeRestart(ctx, r, _timeout);
+ int res = executeRestart(ctx, r, _timeout, safety);
return res;
}
int NdbRestarts::executeRestart(NDBT_Context* ctx,
const char* _name,
- unsigned int _timeout){
+ unsigned int _timeout,
+ int safety){
const NdbRestarts::NdbRestart* r = getRestart(_name);
if (r == NULL)
return NDBT_FAILED;
- int res = executeRestart(ctx, r, _timeout);
+ int res = executeRestart(ctx, r, _timeout, safety);
return res;
}
@@ -658,12 +661,12 @@ NFDuringNR_codes[] = {
5002
};
-int restartNFDuringNR(F_ARGS){
+int restartNFDuringNR(F_ARGS safety){
myRandom48Init((long)NdbTick_CurrentMillisecond());
int i;
const int sz = sizeof(NFDuringNR_codes)/sizeof(NFDuringNR_codes[0]);
- for(i = 0; i<sz; i++){
+ for(i = 0; i<sz && !ctx->closeToTimeout(safety); i++){
int randomId = myRandom48(_restarter.getNumDbNodes());
int nodeId = _restarter.getDbNodeId(randomId);
int error = NFDuringNR_codes[i];
@@ -708,7 +711,7 @@ int restartNFDuringNR(F_ARGS){
if(NdbEnv_GetEnv("USER", buf, 256) == 0 || strcmp(buf, "ejonore") != 0)
return NDBT_OK;
- for(i = 0; i<sz && !ctx->isTestStopped(); i++){
+ for(i = 0; i<sz && !ctx->isTestStopped() && !ctx->closeToTimeout(safety);i++){
const int randomId = myRandom48(_restarter.getNumDbNodes());
int nodeId = _restarter.getDbNodeId(randomId);
const int error = NFDuringNR_codes[i];
@@ -786,7 +789,7 @@ NRDuringLCP_NonMaster_codes[] = {
7018 // Crash in !master when changing state to LCP_TAB_SAVED
};
-int restartNodeDuringLCP(F_ARGS) {
+int restartNodeDuringLCP(F_ARGS safety) {
int i;
// Master
int val = DumpStateOrd::DihMinTimeBetweenLCP;
@@ -794,8 +797,8 @@ int restartNodeDuringLCP(F_ARGS) {
"Failed to set LCP to min value"); // Set LCP to min val
int sz = sizeof(NRDuringLCP_Master_codes)/
sizeof(NRDuringLCP_Master_codes[0]);
- for(i = 0; i<sz; i++) {
-
+ for(i = 0; i<sz && !ctx->closeToTimeout(safety); i++)
+ {
int error = NRDuringLCP_Master_codes[i];
int masterNodeId = _restarter.getMasterNodeId();
@@ -832,7 +835,7 @@ int restartNodeDuringLCP(F_ARGS) {
// NON-Master
sz = sizeof(NRDuringLCP_NonMaster_codes)/
sizeof(NRDuringLCP_NonMaster_codes[0]);
- for(i = 0; i<sz; i++) {
+ for(i = 0; i<sz && !ctx->closeToTimeout(safety) ; i++) {
int error = NRDuringLCP_NonMaster_codes[i];
int nodeId = getRandomNodeId(_restarter);
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3660) | Mikael Ronstrom | 21 Dec |