3415 magnus.blaudd@stripped 2011-11-21 [merge]
Merge 7.2 -> trunk-cluster
added:
storage/ndb/src/kernel/blocks/trpman.cpp
storage/ndb/src/kernel/blocks/trpman.hpp
modified:
mysql-test/suite/ndb/r/ndb_index_stat.result
mysql-test/suite/ndb/t/ndb_index_stat.test
mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result
mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test
scripts/mysql_system_tables.sql
sql/ha_ndbcluster.cc
sql/ha_ndbcluster_binlog.cc
sql/log_event.cc
sql/mysqld.h
sql/ndb_event_data.cc
sql/ndb_event_data.h
sql/ndb_schema_dist.cc
sql/ndb_schema_dist.h
sql/ndb_share.cc
sql/ndb_share.h
sql/sys_vars.cc
storage/ndb/include/kernel/ndb_limits.h
storage/ndb/src/common/debugger/EventLogger.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
storage/ndb/src/kernel/blocks/trix/Trix.cpp
storage/ndb/src/kernel/ndbd.cpp
storage/ndb/src/kernel/vm/Ndbinfo.hpp
storage/ndb/tools/ndbinfo_sql.cpp
3414 Martin Skold 2011-11-17 [merge]
Null merge
modified:
mysql-test/suite/ndb/r/ndb_condition_pushdown.result
mysql-test/suite/ndb/t/ndb_condition_pushdown.test
=== modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result'
--- a/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-11-16 09:29:49 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_stat.result 2011-11-21 12:53:01 +0000
@@ -548,6 +548,23 @@ SELECT count(*) as Count FROM t1 WHERE L
Count
256
drop table t1;
+create table t1 (
+a int unsigned not null,
+b char(180) not null,
+primary key using hash (a),
+index (b)
+) engine=ndb charset=binary;
+insert into t1 values (1,'a'),(2,'b'),(3,'c');
+analyze table t1;
+Table Op Msg_type Msg_text
+test.t1 analyze status OK
+analyze table t1;
+Table Op Msg_type Msg_text
+test.t1 analyze status OK
+analyze table t1;
+Table Op Msg_type Msg_text
+test.t1 analyze status OK
+drop table t1;
set @is_enable = @is_enable_default;
set @is_enable = NULL;
# is_enable_on=0 is_enable_off=1
=== modified file 'mysql-test/suite/ndb/t/ndb_index_stat.test'
--- a/mysql-test/suite/ndb/t/ndb_index_stat.test 2011-09-02 06:43:38 +0000
+++ b/mysql-test/suite/ndb/t/ndb_index_stat.test 2011-11-19 07:56:25 +0000
@@ -374,5 +374,20 @@ SELECT count(*) as Count FROM t1 WHERE L
drop table t1;
+# bug#13407848
+# signed char in compute length bytes caused ndbrequire in Trix.cpp
+
+create table t1 (
+ a int unsigned not null,
+ b char(180) not null,
+ primary key using hash (a),
+ index (b)
+) engine=ndb charset=binary;
+insert into t1 values (1,'a'),(2,'b'),(3,'c');
+analyze table t1;
+analyze table t1;
+analyze table t1;
+drop table t1;
+
set @is_enable = @is_enable_default;
source ndb_index_stat_enable.inc;
=== modified file 'mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result'
--- a/mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result 2011-11-15 14:25:58 +0000
+++ b/mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result 2011-11-21 12:53:01 +0000
@@ -3,22 +3,10 @@ include/master-slave.inc
show variables like 'slave_allow_batching';
Variable_name Value
slave_allow_batching OFF
-Show that slave_allow_batching cannot be changed while slave is running
-set global slave_allow_batching=ON;
-ERROR HY000: This operation cannot be performed with a running slave; run STOP SLAVE first
-show warnings;
-Level Code Message
-Error 1198 This operation cannot be performed with a running slave; run STOP SLAVE first
-show variables like 'slave_allow_batching';
-Variable_name Value
-slave_allow_batching OFF
-Now stop slave and change it
-stop slave;
set global slave_allow_batching=ON;
show variables like 'slave_allow_batching';
Variable_name Value
slave_allow_batching ON
-start slave;
Now the normal test
CREATE TABLE t1 (C1 CHAR(1), C2 CHAR(1), INDEX (C1)) ENGINE = 'INNODB' ;
SELECT * FROM t1;
@@ -501,6 +489,7 @@ SET GLOBAL SLAVE_TYPE_CONVERSIONS = @sav
call mtr.add_suppression("Slave SQL.*Table definition on master and slave does not match: Column 1 size mismatch.* Error_code: 1535");
call mtr.add_suppression("Slave SQL.*Could not execute Delete_rows event on table test.t1.* Error_code: 1032");
call mtr.add_suppression("Slave SQL.*Column 1 of table .test.t.. cannot be converted from type.*, Error_code: 1677");
+call mtr.add_suppression("The slave coordinator and worker threads are stopped, possibly leaving data in inconsistent state");
include/rpl_reset.inc
[expecting slave to replicate correctly]
INSERT INTO t4 VALUES (1, "", 1);
=== modified file 'mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test'
--- a/mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test 2011-11-15 14:25:58 +0000
+++ b/mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test 2011-11-18 09:46:57 +0000
@@ -4,18 +4,9 @@
--connection slave
show variables like 'slave_allow_batching';
---echo Show that slave_allow_batching cannot be changed while slave is running
---error ER_SLAVE_MUST_STOP
set global slave_allow_batching=ON;
-show warnings;
show variables like 'slave_allow_batching';
---echo Now stop slave and change it
-stop slave;
-set global slave_allow_batching=ON;
-show variables like 'slave_allow_batching';
-start slave;
-
--echo Now the normal test
--connection master
=== modified file 'scripts/mysql_system_tables.sql'
--- a/scripts/mysql_system_tables.sql 2011-11-15 13:13:38 +0000
+++ b/scripts/mysql_system_tables.sql 2011-11-21 12:53:01 +0000
@@ -1837,7 +1837,7 @@ EXECUTE stmt;
DROP PREPARE stmt;
# ndbinfo.counters
-SET @str=IF(@have_ndbinfo,'CREATE OR REPLACE DEFINER=`root@localhost` SQL SECURITY INVOKER VIEW `ndbinfo`.`counters` AS SELECT node_id, b.block_name, block_instance, counter_id, CASE counter_id WHEN 1 THEN "ATTRINFO" WHEN 2 THEN "TRANSACTIONS" WHEN 3 THEN "COMMITS" WHEN 4 THEN "READS" WHEN 5 THEN "SIMPLE_READS" WHEN 6 THEN "WRITES" WHEN 7 THEN "ABORTS" WHEN 8 THEN "TABLE_SCANS" WHEN 9 THEN "RANGE_SCANS" WHEN 10 THEN "OPERATIONS" WHEN 11 THEN "READS_RECEIVED" WHEN 12 THEN "LOCAL_READS_SENT" WHEN 13 THEN "REMOTE_READS_SENT" WHEN 14 THEN "READS_NOT_FOUND" WHEN 15 THEN "TABLE_SCANS_RECEIVED" WHEN 16 THEN "LOCAL_TABLE_SCANS_SENT" WHEN 17 THEN "RANGE_SCANS_RECEIVED" WHEN 18 THEN "LOCAL_RANGE_SCANS_SENT" WHEN 19 THEN "REMOTE_RANGE_SCANS_SENT" WHEN 20 THEN "SCAN_BATCHES_RETURNED" WHEN 21 THEN "SCAN_ROWS_RETURNED" WHEN 22 THEN "PRUNED_RANGE_SCANS_RECEIVED" WHEN 23 THEN "CONST_PRUNED_RANGE_SCANS_RECEIVED" ELSE "<unknown>" END AS counter_name, val FROM `ndbinf!
o`.`ndb$counters` c LEFT JOIN `ndbinfo`.blocks b ON c.block_number = b.block_number','SET @dummy = 0');
+SET @str=IF(@have_ndbinfo,'CREATE OR REPLACE DEFINER=`root@localhost` SQL SECURITY INVOKER VIEW `ndbinfo`.`counters` AS SELECT node_id, b.block_name, block_instance, counter_id, CASE counter_id WHEN 1 THEN "ATTRINFO" WHEN 2 THEN "TRANSACTIONS" WHEN 3 THEN "COMMITS" WHEN 4 THEN "READS" WHEN 5 THEN "SIMPLE_READS" WHEN 6 THEN "WRITES" WHEN 7 THEN "ABORTS" WHEN 8 THEN "TABLE_SCANS" WHEN 9 THEN "RANGE_SCANS" WHEN 10 THEN "OPERATIONS" WHEN 11 THEN "READS_RECEIVED" WHEN 12 THEN "LOCAL_READS_SENT" WHEN 13 THEN "REMOTE_READS_SENT" WHEN 14 THEN "READS_NOT_FOUND" WHEN 15 THEN "TABLE_SCANS_RECEIVED" WHEN 16 THEN "LOCAL_TABLE_SCANS_SENT" WHEN 17 THEN "RANGE_SCANS_RECEIVED" WHEN 18 THEN "LOCAL_RANGE_SCANS_SENT" WHEN 19 THEN "REMOTE_RANGE_SCANS_SENT" WHEN 20 THEN "SCAN_BATCHES_RETURNED" WHEN 21 THEN "SCAN_ROWS_RETURNED" WHEN 22 THEN "PRUNED_RANGE_SCANS_RECEIVED" WHEN 23 THEN "CONST_PRUNED_RANGE_SCANS_RECEIVED" WHEN 24 THEN "LOCAL_READS" ELSE "<unknown>" END AS co!
unter_name, val FROM `ndbinfo`.`ndb$counters` c LEFT JOIN `ndbinfo`.blocks b ON c.block_number = b.block_number','SET @dummy = 0');
PREPARE stmt FROM @str;
EXECUTE stmt;
DROP PREPARE stmt;
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2011-11-16 09:29:49 +0000
+++ b/sql/ha_ndbcluster.cc 2011-11-21 12:53:01 +0000
@@ -11011,6 +11011,7 @@ static
void
delete_table_drop_share(NDB_SHARE* share, const char * path)
{
+ DBUG_ENTER("delete_table_drop_share");
if (share)
{
pthread_mutex_lock(&ndbcluster_mutex);
@@ -11042,6 +11043,7 @@ do_drop:
}
pthread_mutex_unlock(&ndbcluster_mutex);
}
+ DBUG_VOID_RETURN;
}
/* static version which does not need a handler */
@@ -11052,7 +11054,7 @@ ha_ndbcluster::drop_table_impl(THD *thd,
const char *db,
const char *table_name)
{
- DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
+ DBUG_ENTER("ha_ndbcluster::drop_table_impl");
NDBDICT *dict= ndb->getDictionary();
int ndb_table_id= 0;
int ndb_table_version= 0;
@@ -11189,8 +11191,7 @@ int ha_ndbcluster::delete_table(const ch
{
THD *thd= current_thd;
Thd_ndb *thd_ndb= get_thd_ndb(thd);
- Ndb *ndb;
- int error= 0;
+
DBUG_ENTER("ha_ndbcluster::delete_table");
DBUG_PRINT("enter", ("name: %s", name));
@@ -11202,6 +11203,7 @@ int ha_ndbcluster::delete_table(const ch
dropped inside ndb.
Just drop local files.
*/
+ DBUG_PRINT("info", ("Table is already dropped in NDB"));
delete_table_drop_share(0, name);
DBUG_RETURN(handler::delete_table(name));
}
@@ -11217,16 +11219,12 @@ int ha_ndbcluster::delete_table(const ch
if (check_ndb_connection(thd))
{
- error= HA_ERR_NO_CONNECTION;
- goto err;
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
- ndb= thd_ndb->ndb;
-
if (!thd_ndb->has_required_global_schema_lock("ha_ndbcluster::delete_table"))
{
- error= HA_ERR_NO_CONNECTION;
- goto err;
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
/*
@@ -11234,6 +11232,8 @@ int ha_ndbcluster::delete_table(const ch
If it was already gone it might have been dropped
remotely, give a warning and then drop .ndb file.
*/
+ int error;
+ Ndb* ndb= thd_ndb->ndb;
if (!(error= drop_table_impl(thd, this, ndb, name,
m_dbname, m_tabname)) ||
error == HA_ERR_NO_SUCH_TABLE)
@@ -11244,7 +11244,6 @@ int ha_ndbcluster::delete_table(const ch
error= error1;
}
-err:
DBUG_RETURN(error);
}
@@ -13379,38 +13378,15 @@ static uchar *ndbcluster_get_key(NDB_SHA
#ifndef DBUG_OFF
-static void print_share(const char* where, NDB_SHARE* share)
-{
- fprintf(DBUG_FILE,
- "%s %s.%s: use_count: %u, commit_count: %lu\n",
- where, share->db, share->table_name, share->use_count,
- (ulong) share->commit_count);
- fprintf(DBUG_FILE,
- " - key: %s, key_length: %d\n",
- share->key, share->key_length);
-
- Ndb_event_data *event_data= 0;
- if (share->event_data)
- event_data= share->event_data;
- else if (share->op)
- event_data= (Ndb_event_data *) share->op->getCustomData();
- if (event_data)
- {
- fprintf(DBUG_FILE,
- " - event_data->shadow_table: %p %s.%s\n",
- event_data->shadow_table, event_data->shadow_table->s->db.str,
- event_data->shadow_table->s->table_name.str);
- }
-}
-
-
static void print_ndbcluster_open_tables()
{
DBUG_LOCK_FILE;
fprintf(DBUG_FILE, ">ndbcluster_open_tables\n");
for (uint i= 0; i < ndbcluster_open_tables.records; i++)
- print_share("",
- (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables, i));
+ {
+ NDB_SHARE* share= (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables, i);
+ share->print("", DBUG_FILE);
+ }
fprintf(DBUG_FILE, "<ndbcluster_open_tables\n");
DBUG_UNLOCK_FILE;
}
@@ -13425,7 +13401,7 @@ static void print_ndbcluster_open_tables
#define dbug_print_share(t, s) \
DBUG_LOCK_FILE; \
DBUG_EXECUTE("info", \
- print_share((t), (s));); \
+ (s)->print((t), DBUG_FILE);); \
DBUG_UNLOCK_FILE;
@@ -13572,6 +13548,7 @@ int ndbcluster_undo_rename_share(THD *th
return 0;
}
+
int ndbcluster_rename_share(THD *thd, NDB_SHARE *share)
{
NDB_SHARE *tmp;
@@ -13620,11 +13597,7 @@ int ndbcluster_rename_share(THD *thd, ND
ha_ndbcluster::set_tabname(share->new_key, share->table_name);
dbug_print_share("ndbcluster_rename_share:", share);
- Ndb_event_data *event_data= 0;
- if (share->event_data)
- event_data= share->event_data;
- else if (share->op)
- event_data= (Ndb_event_data *) share->op->getCustomData();
+ Ndb_event_data *event_data= share->get_event_data_ptr();
if (event_data && event_data->shadow_table)
{
if (!IS_TMP_PREFIX(share->table_name))
@@ -13673,6 +13646,75 @@ NDB_SHARE *ndbcluster_get_share(NDB_SHAR
}
+
+NDB_SHARE*
+NDB_SHARE::create(const char* key, size_t key_length,
+ TABLE* table, const char* db_name, const char* table_name)
+{
+ NDB_SHARE* share;
+ if (!(share= (NDB_SHARE*) my_malloc(sizeof(*share),
+ MYF(MY_WME | MY_ZEROFILL))))
+ return NULL;
+
+ MEM_ROOT **root_ptr=
+ my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
+ MEM_ROOT *old_root= *root_ptr;
+
+ init_sql_alloc(&share->mem_root, 1024, 0);
+ *root_ptr= &share->mem_root; // remember to reset before return
+ share->flags= 0;
+ share->state= NSS_INITIAL;
+ /* Allocate enough space for key, db, and table_name */
+ share->key= (char*) alloc_root(*root_ptr, 2 * (key_length + 1));
+ share->key_length= key_length;
+ strmov(share->key, key);
+ share->db= share->key + key_length + 1;
+ strmov(share->db, db_name);
+ share->table_name= share->db + strlen(share->db) + 1;
+ strmov(share->table_name, table_name);
+ thr_lock_init(&share->lock);
+ pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
+ share->commit_count= 0;
+ share->commit_count_lock= 0;
+
+#ifdef HAVE_NDB_BINLOG
+ share->m_cfn_share= NULL;
+#endif
+
+ share->op= 0;
+ share->new_op= 0;
+ share->event_data= 0;
+
+ {
+ // Create array of bitmap for keeping track of subscribed nodes
+ // NOTE! Only the NDB_SHARE for ndb_schema really needs this
+ int no_nodes= g_ndb_cluster_connection->no_db_nodes();
+ share->subscriber_bitmap= (MY_BITMAP*)
+ alloc_root(&share->mem_root, no_nodes * sizeof(MY_BITMAP));
+ for (int i= 0; i < no_nodes; i++)
+ {
+ bitmap_init(&share->subscriber_bitmap[i],
+ (Uint32*)alloc_root(&share->mem_root, max_ndb_nodes/8),
+ max_ndb_nodes, FALSE);
+ bitmap_clear_all(&share->subscriber_bitmap[i]);
+ }
+ }
+
+ if (ndbcluster_binlog_init_share(current_thd, share, table))
+ {
+ DBUG_PRINT("error", ("get_share: %s could not init share", key));
+ free_root(&share->mem_root, MYF(0));
+ my_free(share, 0);
+ *root_ptr= old_root;
+ return NULL;
+ }
+
+ *root_ptr= old_root;
+
+ return share;
+}
+
+
static inline
NDB_SHARE *ndbcluster_get_share(const char *key, TABLE *table,
bool create_if_not_exists)
@@ -13693,50 +13735,30 @@ NDB_SHARE *ndbcluster_get_share(const ch
DBUG_PRINT("error", ("get_share: %s does not exist", key));
DBUG_RETURN(0);
}
- if ((share= (NDB_SHARE*) my_malloc(sizeof(*share),
- MYF(MY_WME | MY_ZEROFILL))))
- {
- MEM_ROOT **root_ptr=
- my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
- MEM_ROOT *old_root= *root_ptr;
- init_sql_alloc(&share->mem_root, 1024, 0);
- *root_ptr= &share->mem_root; // remember to reset before return
- share->flags= 0;
- share->state= NSS_INITIAL;
- /* enough space for key, db, and table_name */
- share->key= (char*) alloc_root(*root_ptr, 2 * (length + 1));
- share->key_length= length;
- strmov(share->key, key);
- if (my_hash_insert(&ndbcluster_open_tables, (uchar*) share))
- {
- free_root(&share->mem_root, MYF(0));
- my_free((uchar*) share, 0);
- *root_ptr= old_root;
- DBUG_RETURN(0);
- }
- thr_lock_init(&share->lock);
- pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
- share->commit_count= 0;
- share->commit_count_lock= 0;
- share->db= share->key + length + 1;
- ha_ndbcluster::set_dbname(key, share->db);
- share->table_name= share->db + strlen(share->db) + 1;
- ha_ndbcluster::set_tabname(key, share->table_name);
- if (ndbcluster_binlog_init_share(current_thd, share, table))
- {
- DBUG_PRINT("error", ("get_share: %s could not init share", key));
- ndbcluster_real_free_share(&share);
- *root_ptr= old_root;
- DBUG_RETURN(0);
- }
- *root_ptr= old_root;
- }
- else
+
+ /*
+ Extract db and table name from key (to avoid that NDB_SHARE
+ dependens on ha_ndbcluster)
+ */
+ char db_name_buf[FN_HEADLEN];
+ char table_name_buf[FN_HEADLEN];
+ ha_ndbcluster::set_dbname(key, db_name_buf);
+ ha_ndbcluster::set_tabname(key, table_name_buf);
+
+ if (!(share= NDB_SHARE::create(key, length, table,
+ db_name_buf, table_name_buf)))
{
DBUG_PRINT("error", ("get_share: failed to alloc share"));
my_error(ER_OUTOFMEMORY, MYF(0), static_cast<int>(sizeof(*share)));
DBUG_RETURN(0);
}
+
+ // Insert the new share in list of open shares
+ if (my_hash_insert(&ndbcluster_open_tables, (uchar*) share))
+ {
+ NDB_SHARE::destroy(share);
+ DBUG_RETURN(0);
+ }
}
share->use_count++;
if (opt_ndb_extra_logging > 9)
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2011-11-15 13:13:38 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2011-11-21 12:53:01 +0000
@@ -342,6 +342,13 @@ ndb_binlog_open_shadow_table(THD *thd, N
/* We can't use 'use_all_columns()' as the file object is not setup yet */
shadow_table->column_bitmaps_set_no_signal(&shadow_table->s->all_set,
&shadow_table->s->all_set);
+
+ if (shadow_table->s->primary_key == MAX_KEY)
+ share->flags|= NSF_HIDDEN_PK;
+
+ if (shadow_table->s->blob_fields != 0)
+ share->flags|= NSF_BLOB_FLAG;
+
#ifndef DBUG_OFF
dbug_print_table("table", shadow_table);
#endif
@@ -355,48 +362,9 @@ ndb_binlog_open_shadow_table(THD *thd, N
*/
int ndbcluster_binlog_init_share(THD *thd, NDB_SHARE *share, TABLE *_table)
{
- MEM_ROOT *mem_root= &share->mem_root;
- int do_event_op= ndb_binlog_running;
- int error= 0;
DBUG_ENTER("ndbcluster_binlog_init_share");
-#ifdef HAVE_NDB_BINLOG
- share->m_cfn_share= NULL;
-#endif
-
- share->op= 0;
- share->new_op= 0;
- share->event_data= 0;
-
- if (!ndb_schema_share &&
- strcmp(share->db, NDB_REP_DB) == 0 &&
- strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
- do_event_op= 1;
- else if (!ndb_apply_status_share &&
- strcmp(share->db, NDB_REP_DB) == 0 &&
- strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
- do_event_op= 1;
-
- if (Ndb_dist_priv_util::is_distributed_priv_table(share->db,
- share->table_name))
- {
- do_event_op= 0;
- }
-
- {
- int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
- share->subscriber_bitmap= (MY_BITMAP*)
- alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
- for (i= 0; i < no_nodes; i++)
- {
- bitmap_init(&share->subscriber_bitmap[i],
- (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
- max_ndb_nodes, FALSE);
- bitmap_clear_all(&share->subscriber_bitmap[i]);
- }
- }
-
- if (!do_event_op)
+ if (!share->need_events(ndb_binlog_running))
{
if (_table)
{
@@ -409,19 +377,10 @@ int ndbcluster_binlog_init_share(THD *th
{
share->flags|= NSF_NO_BINLOG;
}
- DBUG_RETURN(error);
- }
- while (1)
- {
- if ((error= ndb_binlog_open_shadow_table(thd, share)))
- break;
- if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
- share->flags|= NSF_HIDDEN_PK;
- if (share->event_data->shadow_table->s->blob_fields != 0)
- share->flags|= NSF_BLOB_FLAG;
- break;
+ DBUG_RETURN(0);
}
- DBUG_RETURN(error);
+
+ DBUG_RETURN(ndb_binlog_open_shadow_table(thd, share));
}
/*****************************************************************
@@ -2011,19 +1970,21 @@ ndb_handle_schema_change(THD *thd, Ndb *
const Ndb_event_data *event_data)
{
DBUG_ENTER("ndb_handle_schema_change");
- NDB_SHARE *share= event_data->share;
- TABLE *shadow_table= event_data->shadow_table;
- const char *tabname= shadow_table->s->table_name.str;
- const char *dbname= shadow_table->s->db.str;
- bool do_close_cached_tables= FALSE;
- bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
if (pOp->getEventType() == NDBEVENT::TE_ALTER)
{
+ DBUG_PRINT("exit", ("Event type is TE_ALTER"));
DBUG_RETURN(0);
}
+
+ DBUG_ASSERT(event_data);
DBUG_ASSERT(pOp->getEventType() == NDBEVENT::TE_DROP ||
pOp->getEventType() == NDBEVENT::TE_CLUSTER_FAILURE);
+
+ NDB_SHARE *share= event_data->share;
+ TABLE *shadow_table= event_data->shadow_table;
+ const char *tabname= shadow_table->s->table_name.str;
+ const char *dbname= shadow_table->s->db.str;
{
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
@@ -2049,10 +2010,10 @@ ndb_handle_schema_change(THD *thd, Ndb *
{
share->op= 0;
}
- // either just us or drop table handling as well
-
- /* Signal ha_ndbcluster::delete/rename_table that drop is done */
pthread_mutex_unlock(&share->mutex);
+
+ /* Signal ha_ndbcluster::delete/rename_table that drop is done */
+ DBUG_PRINT("info", ("signal that drop is done"));
(void) pthread_cond_signal(&injector_cond);
pthread_mutex_lock(&ndbcluster_mutex);
@@ -2060,6 +2021,9 @@ ndb_handle_schema_change(THD *thd, Ndb *
DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
share->key, share->use_count));
free_share(&share, TRUE);
+
+ bool do_close_cached_tables= FALSE;
+ bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
if (is_remote_change && share && share->state != NSS_DROPPED)
{
DBUG_PRINT("info", ("remote change"));
@@ -2083,15 +2047,13 @@ ndb_handle_schema_change(THD *thd, Ndb *
share= 0;
pthread_mutex_unlock(&ndbcluster_mutex);
- if (event_data)
- {
- delete event_data;
- pOp->setCustomData(NULL);
- }
+ DBUG_PRINT("info", ("Deleting event_data"));
+ delete event_data;
+ pOp->setCustomData(NULL);
+ DBUG_PRINT("info", ("Dropping event operation"));
pthread_mutex_lock(&injector_mutex);
is_ndb->dropEventOperation(pOp);
- pOp= 0;
pthread_mutex_unlock(&injector_mutex);
if (do_close_cached_tables)
@@ -2531,21 +2493,23 @@ class Ndb_schema_event_handler {
void
- log_after_epoch(Ndb_schema_op* schema)
+ handle_after_epoch(Ndb_schema_op* schema)
{
- DBUG_ENTER("log_after_epoch");
+ DBUG_ENTER("handle_after_epoch");
+ DBUG_PRINT("info", ("Pushing Ndb_schema_op on list to be "
+ "handled after epoch"));
assert(!is_post_epoch()); // Only before epoch
- m_post_epoch_log_list.push_back(schema, m_mem_root);
+ m_post_epoch_handle_list.push_back(schema, m_mem_root);
DBUG_VOID_RETURN;
}
void
- unlock_after_epoch(Ndb_schema_op* schema)
+ ack_after_epoch(Ndb_schema_op* schema)
{
- DBUG_ENTER("unlock_after_epoch");
+ DBUG_ENTER("ack_after_epoch");
assert(!is_post_epoch()); // Only before epoch
- m_post_epoch_unlock_list.push_back(schema, m_mem_root);
+ m_post_epoch_ack_list.push_back(schema, m_mem_root);
DBUG_VOID_RETURN;
}
@@ -2559,18 +2523,21 @@ class Ndb_schema_event_handler {
void
ndbapi_invalidate_table(const char* db_name, const char* table_name) const
{
+ DBUG_ENTER("ndbapi_invalidate_table");
Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
Ndb *ndb= thd_ndb->ndb;
ndb->setDatabaseName(db_name);
Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
ndbtab_g.invalidate();
+ DBUG_VOID_RETURN;
}
void
mysqld_close_cached_table(const char* db_name, const char* table_name) const
{
+ DBUG_ENTER("mysqld_close_cached_table");
// Just mark table as "need reopen"
const bool wait_for_refresh = false;
// Not waiting -> no timeout needed
@@ -2583,11 +2550,57 @@ class Ndb_schema_event_handler {
close_cached_tables(m_thd, &table_list,
wait_for_refresh, timeout);
+ DBUG_VOID_RETURN;
+ }
+
+
+ void
+ mysqld_write_frm_from_ndb(const char* db_name,
+ const char* table_name) const
+ {
+ DBUG_ENTER("mysqld_write_frm_from_ndb");
+ Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+ Ndb *ndb= thd_ndb->ndb;
+ Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
+ const NDBTAB *ndbtab= ndbtab_g.get_table();
+
+ char key[FN_REFLEN];
+ build_table_filename(key, sizeof(key)-1,
+ db_name, table_name, NullS, 0);
+
+ uchar *data= 0, *pack_data= 0;
+ size_t length, pack_length;
+
+ if (readfrm(key, &data, &length) == 0 &&
+ packfrm(data, length, &pack_data, &pack_length) == 0 &&
+ cmp_frm(ndbtab, pack_data, pack_length))
+ {
+ DBUG_PRINT("info", ("Detected frm change of table %s.%s",
+ db_name, table_name));
+
+ DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(),
+ ndbtab->getFrmLength());
+ my_free(data);
+ data= NULL;
+
+ int error;
+ if ((error= unpackfrm(&data, &length,
+ (const uchar*) ndbtab->getFrmData())) ||
+ (error= writefrm(key, data, length)))
+ {
+ sql_print_error("NDB: Failed write frm for %s.%s, error %d",
+ db_name, table_name, error);
+ }
+ }
+ my_free(data);
+ my_free(pack_data);
+ DBUG_VOID_RETURN;
}
NDB_SHARE* get_share(Ndb_schema_op* schema) const
{
+ DBUG_ENTER("get_share(Ndb_schema_op*)");
char key[FN_REFLEN + 1];
build_table_filename(key, sizeof(key) - 1,
schema->db, schema->name, "", 0);
@@ -2597,7 +2610,7 @@ class Ndb_schema_event_handler {
DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
share->key, share->use_count));
}
- return share;
+ DBUG_RETURN(share);
}
@@ -2630,18 +2643,17 @@ class Ndb_schema_event_handler {
}
+ bool is_local_table(const char* db_name, const char* table_name) const
+ {
+ return ndbcluster_check_if_local_table(db_name, table_name);
+ }
+
+
void handle_clear_slock(Ndb_schema_op* schema)
{
- if (!is_post_epoch())
- {
- /*
- handle slock after epoch is completed to ensure that
- schema events get inserted in the binlog after any data
- events
- */
- log_after_epoch(schema);
- return;
- }
+ DBUG_ENTER("handle_clear_slock");
+
+ assert(is_post_epoch());
char key[FN_REFLEN + 1];
build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
@@ -2654,7 +2666,7 @@ class Ndb_schema_event_handler {
if (opt_ndb_extra_logging > 19)
sql_print_information("NDB: Discarding event...no obj: %s (%u/%u)",
key, schema->id, schema->version);
- return;
+ DBUG_VOID_RETURN;
}
if (ndb_schema_object->table_id != schema->id ||
@@ -2670,7 +2682,7 @@ class Ndb_schema_event_handler {
schema->id,
schema->version);
ndb_free_schema_object(&ndb_schema_object);
- return;
+ DBUG_VOID_RETURN;
}
/*
@@ -2699,17 +2711,20 @@ class Ndb_schema_event_handler {
/* Wake up the waiter */
pthread_cond_signal(&injector_cond);
- return;
+
+ DBUG_VOID_RETURN;
}
void
handle_offline_alter_table_commit(Ndb_schema_op* schema)
{
+ DBUG_ENTER("handle_offline_alter_table_commit");
+
assert(is_post_epoch()); // Always after epoch
if (schema->node_id == own_nodeid())
- return;
+ DBUG_VOID_RETURN;
write_schema_op_to_binlog(m_thd, schema);
ndbapi_invalidate_table(schema->db, schema->name);
@@ -2749,7 +2764,7 @@ class Ndb_schema_event_handler {
free_share(&share);
}
- if (ndbcluster_check_if_local_table(schema->db, schema->name) &&
+ if (is_local_table(schema->db, schema->name) &&
!Ndb_dist_priv_util::is_distributed_priv_table(schema->db,
schema->name))
{
@@ -2757,13 +2772,14 @@ class Ndb_schema_event_handler {
"from binlog schema event '%s' from node %d.",
schema->db, schema->name, schema->query,
schema->node_id);
- return;
+ DBUG_VOID_RETURN;
}
if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
{
print_could_not_discover_error(m_thd, schema);
}
+ DBUG_VOID_RETURN;
}
@@ -2775,96 +2791,63 @@ class Ndb_schema_event_handler {
ndbapi_invalidate_table(schema->db, schema->name);
mysqld_close_cached_table(schema->db, schema->name);
- int error= 0;
- Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
- Ndb *ndb= thd_ndb->ndb;
- Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name);
- const NDBTAB *ndbtab= ndbtab_g.get_table();
if (schema->node_id != own_nodeid())
{
- char key[FN_REFLEN];
- uchar *data= 0, *pack_data= 0;
- size_t length, pack_length;
-
- DBUG_PRINT("info", ("Detected frm change of table %s.%s",
- schema->db, schema->name));
write_schema_op_to_binlog(m_thd, schema);
- build_table_filename(key, FN_LEN-1, schema->db, schema->name, NullS, 0);
- /*
- If the there is no local table shadowing the altered table and
- it has an frm that is different than the one on disk then
- overwrite it with the new table definition
- */
- if (!ndbcluster_check_if_local_table(schema->db, schema->name) &&
- readfrm(key, &data, &length) == 0 &&
- packfrm(data, length, &pack_data, &pack_length) == 0 &&
- cmp_frm(ndbtab, pack_data, pack_length))
- {
- DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(),
- ndbtab->getFrmLength());
- my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
- data= NULL;
- if ((error= unpackfrm(&data, &length,
- (const uchar*) ndbtab->getFrmData())) ||
- (error= writefrm(key, data, length)))
- {
- sql_print_error("NDB: Failed write frm for %s.%s, error %d",
- schema->db, schema->name, error);
- }
+ if (!is_local_table(schema->db, schema->name))
+ {
+ mysqld_write_frm_from_ndb(schema->db, schema->name);
}
- my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
- my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
}
NDB_SHARE *share= get_share(schema);
if (share)
{
if (opt_ndb_extra_logging > 9)
- sql_print_information("NDB Binlog: handeling online alter/rename");
+ sql_print_information("NDB Binlog: handling online alter/rename");
pthread_mutex_lock(&share->mutex);
ndb_binlog_close_shadow_table(share);
- if ((error= ndb_binlog_open_shadow_table(m_thd, share)))
+ if (ndb_binlog_open_shadow_table(m_thd, share))
+ {
sql_print_error("NDB Binlog: Failed to re-open shadow table %s.%s",
schema->db, schema->name);
- if (error)
pthread_mutex_unlock(&share->mutex);
- }
- if (!error && share)
- {
- if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
- share->flags|= NSF_HIDDEN_PK;
- /*
- Refresh share->flags to handle added BLOB columns
- */
- if (share->event_data->shadow_table->s->blob_fields != 0)
- share->flags|= NSF_BLOB_FLAG;
-
- /*
- Start subscribing to data changes to the new table definition
- */
- String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name, schema->db, schema->name,
- get_binlog_full(share));
- NdbEventOperation *tmp_op= share->op;
- share->new_op= 0;
- share->op= 0;
-
- if (ndbcluster_create_event_ops(m_thd, share, ndbtab, event_name.c_ptr()))
- {
- sql_print_error("NDB Binlog:"
- "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
- event_name.c_ptr());
}
else
{
- share->new_op= share->op;
- }
- share->op= tmp_op;
- pthread_mutex_unlock(&share->mutex);
+ /*
+ Start subscribing to data changes to the new table definition
+ */
+ String event_name(INJECTOR_EVENT_LEN);
+ ndb_rep_event_name(&event_name, schema->db, schema->name,
+ get_binlog_full(share));
+ NdbEventOperation *tmp_op= share->op;
+ share->new_op= 0;
+ share->op= 0;
- if (opt_ndb_extra_logging > 9)
- sql_print_information("NDB Binlog: handeling online alter/rename done");
+ Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+ Ndb *ndb= thd_ndb->ndb;
+ Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name);
+ const NDBTAB *ndbtab= ndbtab_g.get_table();
+ if (ndbcluster_create_event_ops(m_thd, share, ndbtab,
+ event_name.c_ptr()))
+ {
+ sql_print_error("NDB Binlog:"
+ "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
+ event_name.c_ptr());
+ }
+ else
+ {
+ share->new_op= share->op;
+ }
+ share->op= tmp_op;
+ pthread_mutex_unlock(&share->mutex);
+
+ if (opt_ndb_extra_logging > 9)
+ sql_print_information("NDB Binlog: handling online "
+ "alter/rename done");
+ }
}
if (share)
{
@@ -2904,6 +2887,314 @@ class Ndb_schema_event_handler {
}
+ void
+ handle_drop_table(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("handle_drop_table");
+
+ assert(is_post_epoch()); // Always after epoch
+
+ if (schema->node_id == own_nodeid())
+ DBUG_VOID_RETURN;
+
+ write_schema_op_to_binlog(m_thd, schema);
+
+ if (is_local_table(schema->db, schema->name))
+ {
+ /* Tables exists as a local table, print error and leave it */
+ sql_print_error("NDB Binlog: Skipping dropping locally "
+ "defined table '%s.%s' from binlog schema "
+ "event '%s' from node %d. ",
+ schema->db, schema->name, schema->query,
+ schema->node_id);
+ DBUG_VOID_RETURN;
+ }
+
+ Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+ Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+ const int no_print_error[2]=
+ {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
+ run_query(m_thd, schema->query,
+ schema->query + schema->query_length,
+ no_print_error);
+
+ NDB_SHARE *share= get_share(schema);
+ // invalidation already handled by binlog thread
+ if (!share || !share->op)
+ {
+ ndbapi_invalidate_table(schema->db, schema->name);
+ mysqld_close_cached_table(schema->db, schema->name);
+ }
+ if (share)
+ free_share(&share);
+
+ ndbapi_invalidate_table(schema->db, schema->name);
+ mysqld_close_cached_table(schema->db, schema->name);
+
+ DBUG_VOID_RETURN;
+ }
+
+
+ void
+ handle_rename_table_prepare(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("handle_rename_table_prepare");
+
+ assert(is_post_epoch()); // Always after epoch
+
+ if (schema->node_id == own_nodeid())
+ DBUG_VOID_RETURN;
+
+ NDB_SHARE *share= get_share(schema);
+ if (share)
+ {
+ ndbcluster_prepare_rename_share(share, schema->query);
+ free_share(&share);
+ }
+ DBUG_VOID_RETURN;
+ }
+
+
+ void
+ handle_rename_table(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("handle_rename_table");
+
+ assert(is_post_epoch()); // Always after epoch
+
+ if (schema->node_id == own_nodeid())
+ DBUG_VOID_RETURN;
+
+ write_schema_op_to_binlog(m_thd, schema);
+
+ if (is_local_table(schema->db, schema->name))
+ {
+ /* Tables exists as a local table, print error and leave it */
+ sql_print_error("NDB Binlog: Skipping renaming locally "
+ "defined table '%s.%s' from binlog schema "
+ "event '%s' from node %d. ",
+ schema->db, schema->name, schema->query,
+ schema->node_id);
+ DBUG_VOID_RETURN;
+ }
+
+ Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+ Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+ const int no_print_error[2]=
+ {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
+ run_query(m_thd, schema->query,
+ schema->query + schema->query_length,
+ no_print_error);
+
+ NDB_SHARE *share= get_share(schema);
+ // invalidation already handled by binlog thread
+ if (!share || !share->op)
+ {
+ ndbapi_invalidate_table(schema->db, schema->name);
+ mysqld_close_cached_table(schema->db, schema->name);
+ }
+ if (share)
+ free_share(&share);
+
+ share= get_share(schema);
+ if (share)
+ {
+ ndbcluster_rename_share(m_thd, share);
+ free_share(&share);
+ }
+ DBUG_VOID_RETURN;
+ }
+
+
+ void
+ handle_drop_db(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("handle_drop_db");
+
+ assert(is_post_epoch()); // Always after epoch
+
+ if (schema->node_id == own_nodeid())
+ DBUG_VOID_RETURN;
+
+ write_schema_op_to_binlog(m_thd, schema);
+
+ Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+ Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+ // Set NO_LOCK_SCHEMA_OP before 'check_if_local_tables_indb'
+ // until ndbcluster_find_files does not take GSL
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+
+ if (check_if_local_tables_in_db(schema->db))
+ {
+ /* Tables exists as a local table, print error and leave it */
+ sql_print_error("NDB Binlog: Skipping drop database '%s' since "
+ "it contained local tables "
+ "binlog schema event '%s' from node %d. ",
+ schema->db, schema->query,
+ schema->node_id);
+ DBUG_VOID_RETURN;
+ }
+
+ const int no_print_error[1]= {0};
+ run_query(m_thd, schema->query,
+ schema->query + schema->query_length,
+ no_print_error);
+
+ DBUG_VOID_RETURN;
+ }
+
+
+ void
+ handle_truncate_table(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("handle_truncate_table");
+
+ assert(!is_post_epoch()); // Always directly
+
+ if (schema->node_id == own_nodeid())
+ DBUG_VOID_RETURN;
+
+ write_schema_op_to_binlog(m_thd, schema);
+
+ NDB_SHARE *share= get_share(schema);
+ // invalidation already handled by binlog thread
+ if (!share || !share->op)
+ {
+ ndbapi_invalidate_table(schema->db, schema->name);
+ mysqld_close_cached_table(schema->db, schema->name);
+ }
+ if (share)
+ free_share(&share);
+
+ if (is_local_table(schema->db, schema->name))
+ {
+ sql_print_error("NDB Binlog: Skipping locally defined table "
+ "'%s.%s' from binlog schema event '%s' from "
+ "node %d. ",
+ schema->db, schema->name, schema->query,
+ schema->node_id);
+ DBUG_VOID_RETURN;
+ }
+
+ if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
+ {
+ print_could_not_discover_error(m_thd, schema);
+ }
+
+ DBUG_VOID_RETURN;
+ }
+
+
+ void
+ handle_create_table(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("handle_create_table");
+
+ assert(!is_post_epoch()); // Always directly
+
+ if (schema->node_id == own_nodeid())
+ DBUG_VOID_RETURN;
+
+ write_schema_op_to_binlog(m_thd, schema);
+
+ if (is_local_table(schema->db, schema->name))
+ {
+ sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
+ "binlog schema event '%s' from node %d. ",
+ schema->db, schema->name, schema->query,
+ schema->node_id);
+ DBUG_VOID_RETURN;
+ }
+
+ if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
+ {
+ print_could_not_discover_error(m_thd, schema);
+ }
+
+ DBUG_VOID_RETURN;
+ }
+
+
+ void
+ handle_create_db(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("handle_create_db");
+
+ assert(!is_post_epoch()); // Always directly
+
+ if (schema->node_id == own_nodeid())
+ DBUG_VOID_RETURN;
+
+ write_schema_op_to_binlog(m_thd, schema);
+
+ Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+ Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+ const int no_print_error[1]= {0};
+ run_query(m_thd, schema->query,
+ schema->query + schema->query_length,
+ no_print_error);
+
+ DBUG_VOID_RETURN;
+ }
+
+
+ void
+ handle_alter_db(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("handle_alter_db");
+
+ assert(!is_post_epoch()); // Always directly
+
+ if (schema->node_id == own_nodeid())
+ DBUG_VOID_RETURN;
+
+ write_schema_op_to_binlog(m_thd, schema);
+
+ Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+ Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+ const int no_print_error[1]= {0};
+ run_query(m_thd, schema->query,
+ schema->query + schema->query_length,
+ no_print_error);
+
+ DBUG_VOID_RETURN;
+ }
+
+
+ void
+ handle_grant_op(Ndb_schema_op* schema)
+ {
+ DBUG_ENTER("handle_grant_op");
+
+ assert(!is_post_epoch()); // Always directly
+
+ if (schema->node_id == own_nodeid())
+ DBUG_VOID_RETURN;
+
+ write_schema_op_to_binlog(m_thd, schema);
+
+ if (opt_ndb_extra_logging > 9)
+ sql_print_information("Got dist_priv event: %s, "
+ "flushing privileges",
+ get_schema_type_name(schema->type));
+
+ Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+ Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+ const int no_print_error[1]= {0};
+ char *cmd= (char *) "flush privileges";
+ run_query(m_thd, cmd,
+ cmd + strlen(cmd),
+ no_print_error);
+
+ DBUG_VOID_RETURN;
+ }
+
+
int
handle_schema_op(Ndb_schema_op* schema)
{
@@ -2937,179 +3228,75 @@ class Ndb_schema_event_handler {
switch (schema_type)
{
case SOT_CLEAR_SLOCK:
- handle_clear_slock(schema);
+ /*
+ handle slock after epoch is completed to ensure that
+ schema events get inserted in the binlog after any data
+ events
+ */
+ handle_after_epoch(schema);
DBUG_RETURN(0);
case SOT_ALTER_TABLE_COMMIT:
case SOT_RENAME_TABLE_PREPARE:
case SOT_ONLINE_ALTER_TABLE_PREPARE:
case SOT_ONLINE_ALTER_TABLE_COMMIT:
- log_after_epoch(schema);
- unlock_after_epoch(schema);
+ case SOT_RENAME_TABLE:
+ case SOT_DROP_TABLE:
+ case SOT_DROP_DB:
+ handle_after_epoch(schema);
+ ack_after_epoch(schema);
DBUG_RETURN(0);
- default:
+ case SOT_TRUNCATE_TABLE:
+ handle_truncate_table(schema);
break;
- }
- if (schema->node_id != own_nodeid())
- {
- THD* thd= m_thd; // Code compatibility
- Thd_ndb *thd_ndb= get_thd_ndb(thd);
- Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+ case SOT_CREATE_TABLE:
+ handle_create_table(schema);
+ break;
- int post_epoch_unlock= 0;
-
- switch (schema_type)
- {
- case SOT_RENAME_TABLE:
- case SOT_RENAME_TABLE_NEW:
- case SOT_DROP_TABLE:
- if (! ndbcluster_check_if_local_table(schema->db, schema->name))
- {
- thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
- const int no_print_error[2]=
- {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
- run_query(thd, schema->query,
- schema->query + schema->query_length,
- no_print_error);
- /* binlog dropping table after any table operations */
- log_after_epoch(schema);
- /* acknowledge this query _after_ epoch completion */
- post_epoch_unlock= 1;
- }
- else
- {
- /* Tables exists as a local table, print error and leave it */
- DBUG_PRINT("info", ("Found local table '%s.%s', leaving it",
- schema->db, schema->name));
- sql_print_error("NDB Binlog: Skipping %sing locally "
- "defined table '%s.%s' from binlog schema "
- "event '%s' from node %d. ",
- (schema_type == SOT_DROP_TABLE ? "dropp" : "renam"),
- schema->db, schema->name, schema->query,
- schema->node_id);
- write_schema_op_to_binlog(thd, schema);
- }
- // Fall through
- case SOT_TRUNCATE_TABLE:
- {
- NDB_SHARE *share= get_share(schema);
- // invalidation already handled by binlog thread
- if (!share || !share->op)
- {
- ndbapi_invalidate_table(schema->db, schema->name);
- mysqld_close_cached_table(schema->db, schema->name);
- }
- if (share)
- free_share(&share);
- }
- if (schema_type != SOT_TRUNCATE_TABLE)
- break;
- // fall through
- case SOT_CREATE_TABLE:
- thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
- if (ndbcluster_check_if_local_table(schema->db, schema->name))
- {
- DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
- schema->db, schema->name));
- sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
- "binlog schema event '%s' from node %d. ",
- schema->db, schema->name, schema->query,
- schema->node_id);
- }
- else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
- {
- print_could_not_discover_error(thd, schema);
- }
- write_schema_op_to_binlog(thd, schema);
- break;
+ case SOT_CREATE_DB:
+ handle_create_db(schema);
+ break;
- case SOT_DROP_DB:
- /* Drop the database locally if it only contains ndb tables */
- thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
- if (!check_if_local_tables_in_db(schema->db))
- {
- const int no_print_error[1]= {0};
- run_query(thd, schema->query,
- schema->query + schema->query_length,
- no_print_error);
- /* binlog dropping database after any table operations */
- log_after_epoch(schema);
- /* acknowledge this query _after_ epoch completion */
- post_epoch_unlock= 1;
- }
- else
- {
- /* Database contained local tables, leave it */
- sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables "
- "binlog schema event '%s' from node %d. ",
- schema->db, schema->query,
- schema->node_id);
- write_schema_op_to_binlog(thd, schema);
- }
- break;
+ case SOT_ALTER_DB:
+ handle_alter_db(schema);
+ break;
- case SOT_CREATE_DB:
- case SOT_ALTER_DB:
- {
- thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
- const int no_print_error[1]= {0};
- run_query(thd, schema->query,
- schema->query + schema->query_length,
- no_print_error);
- write_schema_op_to_binlog(thd, schema);
- break;
- }
+ case SOT_CREATE_USER:
+ case SOT_DROP_USER:
+ case SOT_RENAME_USER:
+ case SOT_GRANT:
+ case SOT_REVOKE:
+ handle_grant_op(schema);
+ break;
- case SOT_CREATE_USER:
- case SOT_DROP_USER:
- case SOT_RENAME_USER:
- case SOT_GRANT:
- case SOT_REVOKE:
- {
- if (opt_ndb_extra_logging > 9)
- sql_print_information("Got dist_priv event: %s, "
- "flushing privileges",
- get_schema_type_name(schema_type));
-
- thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
- const int no_print_error[1]= {0};
- char *cmd= (char *) "flush privileges";
- run_query(thd, cmd,
- cmd + strlen(cmd),
- no_print_error);
- write_schema_op_to_binlog(thd, schema);
- break;
- }
-
- case SOT_TABLESPACE:
- case SOT_LOGFILE_GROUP:
- write_schema_op_to_binlog(thd, schema);
+ case SOT_TABLESPACE:
+ case SOT_LOGFILE_GROUP:
+ if (schema->node_id == own_nodeid())
break;
+ write_schema_op_to_binlog(m_thd, schema);
+ break;
- case SOT_ALTER_TABLE_COMMIT:
- case SOT_RENAME_TABLE_PREPARE:
- case SOT_ONLINE_ALTER_TABLE_PREPARE:
- case SOT_ONLINE_ALTER_TABLE_COMMIT:
- case SOT_CLEAR_SLOCK:
- // Impossible to come here, the above types has already
- // been handled and caused the function to return
- abort();
- break;
+ case SOT_RENAME_TABLE_NEW:
+ /*
+ Only very old MySQL Server connected to the cluster may
+ send this schema operation, ignore it
+ */
+ sql_print_error("NDB schema: Skipping old schema operation"
+ "(RENAME_TABLE_NEW) on %s.%s",
+ schema->db, schema->name);
+ DBUG_ASSERT(false);
+ break;
- }
+ }
- /* signal that schema operation has been handled */
- DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length);
- if (bitmap_is_set(&schema->slock, own_nodeid()))
- {
- if (post_epoch_unlock)
- unlock_after_epoch(schema);
- else
- ack_schema_op(schema->db, schema->name,
- schema->id, schema->version);
- }
+ /* signal that schema operation has been handled */
+ DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length);
+ if (bitmap_is_set(&schema->slock, own_nodeid()))
+ {
+ ack_schema_op(schema->db, schema->name,
+ schema->id, schema->version);
}
}
DBUG_RETURN(0);
@@ -3120,7 +3307,6 @@ class Ndb_schema_event_handler {
handle_schema_op_post_epoch(Ndb_schema_op* schema)
{
DBUG_ENTER("handle_schema_op_post_epoch");
- THD* thd = m_thd; // Code compatibility
DBUG_PRINT("enter", ("%s.%s: query: '%s' type: %d",
schema->db, schema->name,
schema->query, schema->type));
@@ -3140,39 +3326,20 @@ class Ndb_schema_event_handler {
break;
case SOT_DROP_DB:
- write_schema_op_to_binlog(thd, schema);
+ handle_drop_db(schema);
break;
case SOT_DROP_TABLE:
- write_schema_op_to_binlog(thd, schema);
- ndbapi_invalidate_table(schema->db, schema->name);
- mysqld_close_cached_table(schema->db, schema->name);
+ handle_drop_table(schema);
break;
- case SOT_RENAME_TABLE:
- {
- write_schema_op_to_binlog(thd, schema);
- NDB_SHARE *share= get_share(schema);
- if (share)
- {
- ndbcluster_rename_share(thd, share);
- free_share(&share);
- }
+ case SOT_RENAME_TABLE_PREPARE:
+ handle_rename_table_prepare(schema);
break;
- }
- case SOT_RENAME_TABLE_PREPARE:
- {
- if (schema->node_id == own_nodeid())
- break;
- NDB_SHARE *share= get_share(schema);
- if (share)
- {
- ndbcluster_prepare_rename_share(share, schema->query);
- free_share(&share);
- }
+ case SOT_RENAME_TABLE:
+ handle_rename_table(schema);
break;
- }
case SOT_ALTER_TABLE_COMMIT:
handle_offline_alter_table_commit(schema);
@@ -3186,46 +3353,6 @@ class Ndb_schema_event_handler {
handle_online_alter_table_commit(schema);
break;
- case SOT_RENAME_TABLE_NEW:
- {
- write_schema_op_to_binlog(thd, schema);
- NDB_SHARE *share= get_share(schema);
- if (ndb_binlog_running && (!share || !share->op))
- {
- /*
- we need to free any share here as command below
- may need to call handle_trailing_share
- */
- if (share)
- {
- /* ndb_share reference temporary free */
- DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
- share->key, share->use_count));
- free_share(&share);
- share= 0;
- }
-
- if (ndbcluster_check_if_local_table(schema->db, schema->name))
- {
- DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
- schema->db, schema->name));
- sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
- "binlog schema event '%s' from node %d. ",
- schema->db, schema->name, schema->query,
- schema->node_id);
- }
- else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
- {
- print_could_not_discover_error(thd, schema);
- }
- }
- if (share)
- {
- free_share(&share);
- }
- break;
- }
-
default:
DBUG_ASSERT(FALSE);
}
@@ -3234,39 +3361,6 @@ class Ndb_schema_event_handler {
DBUG_VOID_RETURN;
}
-
- /*
- process any operations that should be done after
- the epoch is complete
- */
- void
- handle_schema_log_post_epoch(List<Ndb_schema_op> *log_list)
- {
- DBUG_ENTER("handle_schema_log_post_epoch");
-
- Ndb_schema_op* schema;
- while ((schema= log_list->pop()))
- {
- handle_schema_op_post_epoch(schema);
- }
- DBUG_VOID_RETURN;
- }
-
-
- void
- handle_schema_unlock_post_epoch(List<Ndb_schema_op> *unlock_list)
- {
- DBUG_ENTER("handle_schema_unlock_post_epoch");
-
- Ndb_schema_op *schema;
- while ((schema= unlock_list->pop()))
- {
- ack_schema_op(schema->db, schema->name,
- schema->id, schema->version);
- }
- DBUG_VOID_RETURN;
- }
-
THD* m_thd;
MEM_ROOT* m_mem_root;
uint m_own_nodeid;
@@ -3274,8 +3368,8 @@ class Ndb_schema_event_handler {
bool is_post_epoch(void) const { return m_post_epoch; };
- List<Ndb_schema_op> m_post_epoch_log_list;
- List<Ndb_schema_op> m_post_epoch_unlock_list;
+ List<Ndb_schema_op> m_post_epoch_handle_list;
+ List<Ndb_schema_op> m_post_epoch_ack_list;
public:
Ndb_schema_event_handler(); // Not implemented
@@ -3291,8 +3385,8 @@ public:
~Ndb_schema_event_handler()
{
// There should be no work left todo...
- DBUG_ASSERT(m_post_epoch_log_list.elements == 0);
- DBUG_ASSERT(m_post_epoch_unlock_list.elements == 0);
+ DBUG_ASSERT(m_post_epoch_handle_list.elements == 0);
+ DBUG_ASSERT(m_post_epoch_ack_list.elements == 0);
}
@@ -3436,23 +3530,39 @@ public:
void post_epoch()
{
- if (m_post_epoch_log_list.elements > 0)
+ if (unlikely(m_post_epoch_handle_list.elements > 0))
{
// Set the flag used to check that functions are called at correct time
m_post_epoch= true;
- handle_schema_log_post_epoch(&m_post_epoch_log_list);
- // NOTE post_epoch_unlock_list may not be handled!
- handle_schema_unlock_post_epoch(&m_post_epoch_unlock_list);
+ /*
+ process any operations that should be done after
+ the epoch is complete
+ */
+ Ndb_schema_op* schema;
+ while ((schema= m_post_epoch_handle_list.pop()))
+ {
+ handle_schema_op_post_epoch(schema);
+ }
+
+ /*
+ process any operations that should be unlocked/acked after
+ the epoch is complete
+ */
+ while ((schema= m_post_epoch_ack_list.pop()))
+ {
+ ack_schema_op(schema->db, schema->name,
+ schema->id, schema->version);
+ }
}
// There should be no work left todo...
- DBUG_ASSERT(m_post_epoch_log_list.elements == 0);
- DBUG_ASSERT(m_post_epoch_unlock_list.elements == 0);
+ DBUG_ASSERT(m_post_epoch_handle_list.elements == 0);
+ DBUG_ASSERT(m_post_epoch_ack_list.elements == 0);
}
};
/*********************************************************************
- Internal helper functions for handeling of the cluster replication tables
+ Internal helper functions for handling of the cluster replication tables
- ndb_binlog_index
- ndb_apply_status
*********************************************************************/
@@ -5081,8 +5191,9 @@ ndbcluster_check_if_local_table(const ch
char ndb_file[FN_REFLEN + 1];
DBUG_ENTER("ndbcluster_check_if_local_table");
- build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0);
- build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0);
+ build_table_filename(key, sizeof(key)-1, dbname, tabname, reg_ext, 0);
+ build_table_filename(ndb_file, sizeof(ndb_file)-1,
+ dbname, tabname, ha_ndb_ext, 0);
/* Check that any defined table is an ndb table */
DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
@@ -5107,7 +5218,6 @@ int ndbcluster_create_binlog_setup(THD *
const char *table_name,
TABLE * table)
{
- int do_event_op= ndb_binlog_running;
DBUG_ENTER("ndbcluster_create_binlog_setup");
DBUG_PRINT("enter",("key: %s key_len: %d %s.%s",
key, key_len, db, table_name));
@@ -5131,25 +5241,7 @@ int ndbcluster_create_binlog_setup(THD *
DBUG_RETURN(0); // replication already setup, or should not
}
- if (Ndb_dist_priv_util::is_distributed_priv_table(db, table_name))
- {
- // The distributed privilege tables are distributed by writing
- // the CREATE USER, GRANT, REVOKE etc. to ndb_schema -> no need
- // to listen to events from this table
- DBUG_PRINT("info", ("Skipping binlogging of table %s/%s", db, table_name));
- do_event_op= 0;
- }
-
- if (!ndb_schema_share &&
- strcmp(share->db, NDB_REP_DB) == 0 &&
- strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
- do_event_op= 1;
- else if (!ndb_apply_status_share &&
- strcmp(share->db, NDB_REP_DB) == 0 &&
- strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
- do_event_op= 1;
-
- if (!do_event_op)
+ if (!share->need_events(ndb_binlog_running))
{
set_binlog_nologging(share);
pthread_mutex_unlock(&share->mutex);
@@ -5256,11 +5348,8 @@ ndbcluster_create_event(THD *thd, Ndb *n
ndbtab->getName(), ndbtab->getObjectVersion(),
event_name, share ? share->key : "(nil)"));
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
- if (!share)
- {
- DBUG_PRINT("info", ("share == NULL"));
- DBUG_RETURN(0);
- }
+ DBUG_ASSERT(share);
+
if (get_binlog_nologging(share))
{
if (opt_ndb_extra_logging && ndb_binlog_running)
@@ -5440,8 +5529,7 @@ ndbcluster_create_event_ops(THD *thd, ND
DBUG_ENTER("ndbcluster_create_event_ops");
DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
-
- DBUG_ASSERT(share != 0);
+ DBUG_ASSERT(share);
if (get_binlog_nologging(share))
{
@@ -5455,7 +5543,6 @@ ndbcluster_create_event_ops(THD *thd, ND
assert(!Ndb_dist_priv_util::is_distributed_priv_table(share->db,
share->table_name));
- Ndb_event_data *event_data= share->event_data;
int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
#ifdef HAVE_NDB_BINLOG
uint len= (int)strlen(share->table_name);
@@ -5480,6 +5567,10 @@ ndbcluster_create_event_ops(THD *thd, ND
DBUG_RETURN(0);
}
+ // Check that the share agrees
+ DBUG_ASSERT(share->need_events(ndb_binlog_running));
+
+ Ndb_event_data *event_data= share->event_data;
if (share->op)
{
event_data= (Ndb_event_data *) share->op->getCustomData();
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-11-16 09:29:49 +0000
+++ b/sql/log_event.cc 2011-11-21 12:53:01 +0000
@@ -9036,7 +9036,7 @@ int Rows_log_event::do_apply_event(Relay
Note that unlike the other thd options set here, this one
comes from a global, and not from the incoming event.
*/
- if (slave_allow_batching)
+ if (opt_slave_allow_batching)
thd->variables.option_bits|= OPTION_ALLOW_BATCH;
else
thd->variables.option_bits&= ~OPTION_ALLOW_BATCH;
=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h 2011-11-16 09:29:49 +0000
+++ b/sql/mysqld.h 2011-11-21 12:53:01 +0000
@@ -179,7 +179,9 @@ extern ulong slow_launch_threads, slow_l
extern ulong table_cache_size, table_def_size;
extern MYSQL_PLUGIN_IMPORT ulong max_connections;
extern ulong max_connect_errors, connect_timeout;
-extern my_bool slave_allow_batching;
+#ifndef MCP_WL3733
+extern my_bool opt_slave_allow_batching;
+#endif
extern my_bool allow_slave_start;
extern LEX_CSTRING reason_slave_blocked;
extern ulong slave_trans_retries;
=== modified file 'sql/ndb_event_data.cc'
--- a/sql/ndb_event_data.cc 2011-10-29 09:02:21 +0000
+++ b/sql/ndb_event_data.cc 2011-11-10 08:21:36 +0000
@@ -42,3 +42,13 @@ Ndb_event_data::~Ndb_event_data()
*/
my_free(ndb_value[0]);
}
+
+
+void Ndb_event_data::print(const char* where, FILE* file) const
+{
+ fprintf(file,
+ "%s shadow_table: %p '%s.%s'\n",
+ where,
+ shadow_table, shadow_table->s->db.str,
+ shadow_table->s->table_name.str);
+}
=== modified file 'sql/ndb_event_data.h'
--- a/sql/ndb_event_data.h 2011-10-29 09:02:21 +0000
+++ b/sql/ndb_event_data.h 2011-11-10 08:21:36 +0000
@@ -34,6 +34,8 @@ public:
struct TABLE *shadow_table;
struct NDB_SHARE *share;
union NdbValue *ndb_value[2];
+
+ void print(const char* where, FILE* file) const;
};
#endif
=== modified file 'sql/ndb_schema_dist.cc'
--- a/sql/ndb_schema_dist.cc 2011-11-02 09:28:48 +0000
+++ b/sql/ndb_schema_dist.cc 2011-11-10 08:29:32 +0000
@@ -26,8 +26,6 @@ get_schema_type_name(uint type)
return "DROP_TABLE";
case SOT_CREATE_TABLE:
return "CREATE_TABLE";
- case SOT_RENAME_TABLE_NEW:
- return "RENAME_TABLE_NEW";
case SOT_ALTER_TABLE_COMMIT:
return "ALTER_TABLE_COMMIT";
case SOT_DROP_DB:
=== modified file 'sql/ndb_schema_dist.h'
--- a/sql/ndb_schema_dist.h 2011-11-02 09:28:48 +0000
+++ b/sql/ndb_schema_dist.h 2011-11-10 08:29:32 +0000
@@ -40,7 +40,7 @@ enum SCHEMA_OP_TYPE
{
SOT_DROP_TABLE= 0,
SOT_CREATE_TABLE= 1,
- SOT_RENAME_TABLE_NEW= 2,
+ SOT_RENAME_TABLE_NEW= 2, // Unused, but still reserved
SOT_ALTER_TABLE_COMMIT= 3,
SOT_DROP_DB= 4,
SOT_CREATE_DB= 5,
=== modified file 'sql/ndb_share.cc'
--- a/sql/ndb_share.cc 2011-11-07 19:00:35 +0000
+++ b/sql/ndb_share.cc 2011-11-10 08:21:36 +0000
@@ -17,6 +17,10 @@
#include "ndb_share.h"
#include "ndb_event_data.h"
+#include "ndb_dist_priv_util.h"
+#include "ha_ndbcluster_tables.h"
+
+#include <ndbapi/NdbEventOperation.hpp>
#include <my_sys.h>
@@ -37,12 +41,109 @@ NDB_SHARE::destroy(NDB_SHARE* share)
}
#endif
share->new_op= 0;
- if (share->event_data)
+ Ndb_event_data* event_data = share->event_data;
+ if (event_data)
{
- delete share->event_data;
- share->event_data= 0;
+ delete event_data;
+ event_data= 0;
}
free_root(&share->mem_root, MYF(0));
my_free(share);
}
+
+bool
+NDB_SHARE::need_events(bool default_on) const
+{
+ DBUG_ENTER("NDB_SHARE::need_events");
+ DBUG_PRINT("enter", ("db: %s, table_name: %s",
+ db, table_name));
+
+ if (default_on)
+ {
+ // Events are on by default, check if it should be turned off
+
+ if (Ndb_dist_priv_util::is_distributed_priv_table(db, table_name))
+ {
+ /*
+ The distributed privilege tables are distributed by writing
+ the CREATE USER, GRANT, REVOKE etc. to ndb_schema -> no need
+ to listen to events from those table
+ */
+ DBUG_PRINT("exit", ("no events for dist priv table"));
+ DBUG_RETURN(false);
+ }
+
+ DBUG_PRINT("exit", ("need events(the default for this mysqld)"));
+ DBUG_RETURN(true);
+ }
+
+ // Events are off by default, check if it should be turned on
+ if (strcmp(db, NDB_REP_DB) == 0)
+ {
+ // The table is in "mysql" database
+ if (strcmp(table_name, NDB_SCHEMA_TABLE) == 0)
+ {
+ DBUG_PRINT("exit", ("need events for " NDB_SCHEMA_TABLE));
+ DBUG_RETURN(true);
+ }
+
+ if (strcmp(table_name, NDB_APPLY_TABLE) == 0)
+ {
+ DBUG_PRINT("exit", ("need events for " NDB_APPLY_TABLE));
+ DBUG_RETURN(true);
+ }
+ }
+
+ DBUG_PRINT("exit", ("no events(the default for this mysqld)"));
+ DBUG_RETURN(false);
+}
+
+
+Ndb_event_data* NDB_SHARE::get_event_data_ptr() const
+{
+ if (event_data)
+ {
+ // The event_data pointer is only used before
+ // creating the NdbEventoperation -> check no op yet
+ assert(!op);
+
+ return event_data;
+ }
+
+ if (op)
+ {
+ // The event_data should now be empty since it's been moved to
+ // op's custom data
+ assert(!event_data);
+
+ // Check that op has custom data
+ assert(op->getCustomData());
+
+ return (Ndb_event_data*)op->getCustomData();
+ }
+
+ return NULL;
+}
+
+
+void NDB_SHARE::print(const char* where, FILE* file) const
+{
+ fprintf(file, "%s %s.%s: use_count: %u\n",
+ where, db, table_name, use_count);
+ fprintf(file, " - key: '%s', key_length: %d\n", key, key_length);
+ fprintf(file, " - commit_count: %llu\n", commit_count);
+ if (new_key)
+ fprintf(file, " - new_key: %p, '%s'\n",
+ new_key, new_key);
+ if (event_data)
+ fprintf(file, " - event_data: %p\n", event_data);
+ if (op)
+ fprintf(file, " - op: %p\n", op);
+ if (new_op)
+ fprintf(file, " - new_op: %p\n", new_op);
+
+ Ndb_event_data *event_data_ptr= get_event_data_ptr();
+ if (event_data_ptr)
+ event_data_ptr->print(" -", file);
+}
=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h 2011-11-05 11:35:57 +0000
+++ b/sql/ndb_share.h 2011-11-10 08:21:36 +0000
@@ -188,7 +188,20 @@ struct NDB_SHARE {
MY_BITMAP *subscriber_bitmap;
class NdbEventOperation *new_op;
+ static NDB_SHARE* create(const char* key, size_t key_length,
+ struct TABLE* table, const char* db_name,
+ const char* table_name);
static void destroy(NDB_SHARE* share);
+
+ class Ndb_event_data* get_event_data_ptr() const;
+
+ void print(const char* where, FILE* file = stderr) const;
+
+ /*
+ Returns true if this share need to subscribe to
+ events from the table.
+ */
+ bool need_events(bool default_on) const;
};
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2011-11-16 09:29:49 +0000
+++ b/sql/sys_vars.cc 2011-11-21 12:53:01 +0000
@@ -2819,40 +2819,6 @@ static Sys_var_ulong Sys_profiling_histo
VALID_RANGE(0, 100), DEFAULT(15), BLOCK_SIZE(1));
#endif
-#ifndef MCP_WL3733
-#ifndef EMBEDDED_LIBRARY
-my_bool slave_allow_batching;
-
-/*
- Take Active MI lock while checking/updating slave_allow_batching
- to give atomicity w.r.t. slave state changes
-*/
-static PolyLock_mutex PLock_active_mi(&LOCK_active_mi);
-
-static bool slave_allow_batching_check(sys_var *self, THD *thd, set_var *var)
-{
- /* Only allow a change if the slave SQL thread is currently stopped */
- bool slave_sql_running = active_mi->rli->slave_running;
-
- if (slave_sql_running)
- {
- my_error(ER_SLAVE_MUST_STOP, MYF(0));
- return true;
- }
-
- return false;
-}
-
-static Sys_var_mybool Sys_slave_allow_batching(
- "slave_allow_batching", "Allow slave to batch requests",
- GLOBAL_VAR(slave_allow_batching),
- CMD_LINE(OPT_ARG), DEFAULT(FALSE),
- &PLock_active_mi,
- NOT_IN_BINLOG,
- ON_CHECK(slave_allow_batching_check));
-#endif
-#endif
-
static Sys_var_harows Sys_select_limit(
"sql_select_limit",
"The maximum number of rows to return from SELECT statements",
@@ -3384,6 +3350,14 @@ static Sys_var_mybool Sys_relay_log_reco
"processed",
GLOBAL_VAR(relay_log_recovery), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+#ifndef MCP_WL3733
+my_bool opt_slave_allow_batching;
+static Sys_var_mybool Sys_slave_allow_batching(
+ "slave_allow_batching", "Allow slave to batch requests",
+ GLOBAL_VAR(opt_slave_allow_batching),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+#endif
+
static Sys_var_charptr Sys_slave_load_tmpdir(
"slave_load_tmpdir", "The location where the slave should put "
"its temporary files when replicating a LOAD DATA INFILE command",
=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h 2011-11-16 08:17:17 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h 2011-11-18 06:47:23 +0000
@@ -194,6 +194,7 @@
#define NDBMT_BLOCK_MASK ((1 << NDBMT_BLOCK_BITS) - 1)
#define NDBMT_BLOCK_INSTANCE_BITS 7
+#define NDB_DEFAULT_LOG_PARTS 4
#define NDB_MAX_LOG_PARTS 4
#define MAX_NDBMT_LQH_WORKERS NDB_MAX_LOG_PARTS
#define MAX_NDBMT_LQH_THREADS NDB_MAX_LOG_PARTS
=== modified file 'storage/ndb/src/common/debugger/EventLogger.cpp'
--- a/storage/ndb/src/common/debugger/EventLogger.cpp 2011-10-21 12:36:44 +0000
+++ b/storage/ndb/src/common/debugger/EventLogger.cpp 2011-11-18 06:47:23 +0000
@@ -527,23 +527,47 @@ void getTextTransReportCounters(QQQQ) {
// -------------------------------------------------------------------
// Report information about transaction activity once per 10 seconds.
// -------------------------------------------------------------------
- BaseString::snprintf(m_text, m_text_len,
- "Trans. Count = %u, Commit Count = %u, "
- "Read Count = %u, Simple Read Count = %u, "
- "Write Count = %u, AttrInfo Count = %u, "
- "Concurrent Operations = %u, Abort Count = %u"
- " Scans = %u Range scans = %u",
- theData[1],
- theData[2],
- theData[3],
- theData[4],
- theData[5],
- theData[6],
- theData[7],
- theData[8],
- theData[9],
- theData[10]);
+ if (len <= 11)
+ {
+ BaseString::snprintf(m_text, m_text_len,
+ "Trans. Count = %u, Commit Count = %u, "
+ "Read Count = %u, Simple Read Count = %u, "
+ "Write Count = %u, AttrInfo Count = %u, "
+ "Concurrent Operations = %u, Abort Count = %u"
+ " Scans = %u Range scans = %u",
+ theData[1],
+ theData[2],
+ theData[3],
+ theData[4],
+ theData[5],
+ theData[6],
+ theData[7],
+ theData[8],
+ theData[9],
+ theData[10]);
+ }
+ else
+ {
+ BaseString::snprintf(m_text, m_text_len,
+ "Trans. Count = %u, Commit Count = %u, "
+ "Read Count = %u, Simple Read Count = %u, "
+ "Write Count = %u, AttrInfo Count = %u, "
+ "Concurrent Operations = %u, Abort Count = %u"
+ " Scans = %u Range scans = %u, Local Read Count = %u",
+ theData[1],
+ theData[2],
+ theData[3],
+ theData[4],
+ theData[5],
+ theData[6],
+ theData[7],
+ theData[8],
+ theData[9],
+ theData[10],
+ theData[11]);
+ }
}
+
void getTextOperationReportCounters(QQQQ) {
BaseString::snprintf(m_text, m_text_len,
"Operations=%u",
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2011-11-16 09:29:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2011-11-21 12:53:01 +0000
@@ -207,7 +207,8 @@ Dbdict::execDUMP_STATE_ORD(Signal* signa
const Uint32 tab = signal->theData[1];
const Uint32 ver = signal->theData[2];
TableRecordPtr tabRecPtr;
- c_tableRecordPool.getPtr(tabRecPtr, tab);
+ bool ok = find_object(tabRecPtr, tab);
+ ndbrequire(ok);
DropTableReq * req = (DropTableReq*)signal->getDataPtr();
req->senderData = 1225;
req->senderRef = numberToRef(1,1);
@@ -252,8 +253,8 @@ Dbdict::execDUMP_STATE_ORD(Signal* signa
{
RSS_AP_SNAPSHOT_SAVE(c_rope_pool);
RSS_AP_SNAPSHOT_SAVE(c_attributeRecordPool);
- RSS_AP_SNAPSHOT_SAVE(c_tableRecordPool);
- RSS_AP_SNAPSHOT_SAVE(c_triggerRecordPool);
+ RSS_AP_SNAPSHOT_SAVE(c_tableRecordPool_);
+ RSS_AP_SNAPSHOT_SAVE(c_triggerRecordPool_);
RSS_AP_SNAPSHOT_SAVE(c_obj_pool);
RSS_AP_SNAPSHOT_SAVE(c_hash_map_pool);
RSS_AP_SNAPSHOT_SAVE(g_hash_map);
@@ -263,8 +264,8 @@ Dbdict::execDUMP_STATE_ORD(Signal* signa
{
RSS_AP_SNAPSHOT_CHECK(c_rope_pool);
RSS_AP_SNAPSHOT_CHECK(c_attributeRecordPool);
- RSS_AP_SNAPSHOT_CHECK(c_tableRecordPool);
- RSS_AP_SNAPSHOT_CHECK(c_triggerRecordPool);
+ RSS_AP_SNAPSHOT_CHECK(c_tableRecordPool_);
+ RSS_AP_SNAPSHOT_CHECK(c_triggerRecordPool_);
RSS_AP_SNAPSHOT_CHECK(c_obj_pool);
RSS_AP_SNAPSHOT_CHECK(c_hash_map_pool);
RSS_AP_SNAPSHOT_CHECK(g_hash_map);
@@ -296,16 +297,16 @@ void Dbdict::execDBINFO_SCANREQ(Signal *
c_attributeRecordPool.getUsedHi(),
{ CFG_DB_NO_ATTRIBUTES,0,0,0 }},
{ "Table Record",
- c_tableRecordPool.getUsed(),
+ c_tableRecordPool_.getUsed(),
c_noOfMetaTables,
- c_tableRecordPool.getEntrySize(),
- c_tableRecordPool.getUsedHi(),
+ c_tableRecordPool_.getEntrySize(),
+ c_tableRecordPool_.getUsedHi(),
{ CFG_DB_NO_TABLES,0,0,0 }},
{ "Trigger Record",
- c_triggerRecordPool.getUsed(),
- c_triggerRecordPool.getSize(),
- c_triggerRecordPool.getEntrySize(),
- c_triggerRecordPool.getUsedHi(),
+ c_triggerRecordPool_.getUsed(),
+ c_triggerRecordPool_.getSize(),
+ c_triggerRecordPool_.getEntrySize(),
+ c_triggerRecordPool_.getUsedHi(),
{ CFG_DB_NO_TRIGGERS,0,0,0 }},
{ "FS Connect Record",
c_fsConnectRecordPool.getUsed(),
@@ -512,8 +513,8 @@ void Dbdict::packTableIntoPages(Signal*
case DictTabInfo::OrderedIndex:{
jam();
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, tableId);
- if (tablePtr.p->m_obj_ptr_i == RNIL)
+ bool ok = find_object(tablePtr, tableId);
+ if (!ok)
{
jam();
sendGET_TABINFOREF(signal, &req_copy,
@@ -698,7 +699,15 @@ Dbdict::packTableIntoPages(SimplePropert
{
jam();
TableRecordPtr primTab;
- c_tableRecordPool.getPtr(primTab, tablePtr.p->primaryTableId);
+ bool ok = find_object(primTab, tablePtr.p->primaryTableId);
+ if (!ok)
+ {
+ jam();
+ ndbrequire(signal != NULL);
+ Uint32 err = CreateFragmentationRef::InvalidPrimaryTable;
+ signal->theData[0] = err;
+ return;
+ }
ConstRope r2(c_rope_pool, primTab.p->tableName);
r2.copy(tableName);
w.add(DictTabInfo::PrimaryTable, tableName);
@@ -893,8 +902,6 @@ Dbdict::execCREATE_FRAGMENTATION_REQ(Sig
return;
}
- TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, req->primaryTableId);
XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry * te = getTableEntry(xsf, req->primaryTableId);
if (te->m_tableState != SchemaFile::SF_CREATE)
@@ -911,8 +918,9 @@ Dbdict::execCREATE_FRAGMENTATION_REQ(Sig
}
DictObjectPtr obj_ptr;
- c_obj_pool.getPtr(obj_ptr, tablePtr.p->m_obj_ptr_i);
-
+ TableRecordPtr tablePtr;
+ bool ok = find_object(obj_ptr, tablePtr, req->primaryTableId);
+ ndbrequire(ok);
SchemaOpPtr op_ptr;
findDictObjectOp(op_ptr, obj_ptr);
ndbrequire(!op_ptr.isNull());
@@ -2233,8 +2241,6 @@ void Dbdict::initRecords()
{
initNodeRecords();
initPageRecords();
- initTableRecords();
- initTriggerRecords();
}//Dbdict::initRecords()
void Dbdict::initSendSchemaRecord()
@@ -2315,27 +2321,12 @@ void Dbdict::initPageRecords()
c_schemaRecord.oldSchemaPage = NDB_SF_MAX_PAGES;
}//Dbdict::initPageRecords()
-void Dbdict::initTableRecords()
-{
- TableRecordPtr tablePtr;
- while (1) {
- jam();
- refresh_watch_dog();
- c_tableRecordPool.seize(tablePtr);
- if (tablePtr.i == RNIL) {
- jam();
- break;
- }//if
- initialiseTableRecord(tablePtr);
- }//while
-}//Dbdict::initTableRecords()
-
-void Dbdict::initialiseTableRecord(TableRecordPtr tablePtr)
+void Dbdict::initialiseTableRecord(TableRecordPtr tablePtr, Uint32 tableId)
{
new (tablePtr.p) TableRecord();
tablePtr.p->filePtr[0] = RNIL;
tablePtr.p->filePtr[1] = RNIL;
- tablePtr.p->tableId = tablePtr.i;
+ tablePtr.p->tableId = tableId;
tablePtr.p->tableVersion = (Uint32)-1;
tablePtr.p->fragmentType = DictTabInfo::AllNodesSmallTable;
tablePtr.p->gciTableCreated = 0;
@@ -2375,29 +2366,15 @@ void Dbdict::initialiseTableRecord(Table
tablePtr.p->m_obj_ptr_i = RNIL;
}//Dbdict::initialiseTableRecord()
-void Dbdict::initTriggerRecords()
-{
- TriggerRecordPtr triggerPtr;
- while (1) {
- jam();
- refresh_watch_dog();
- c_triggerRecordPool.seize(triggerPtr);
- if (triggerPtr.i == RNIL) {
- jam();
- break;
- }//if
- initialiseTriggerRecord(triggerPtr);
- }//while
-}
-
-void Dbdict::initialiseTriggerRecord(TriggerRecordPtr triggerPtr)
+void Dbdict::initialiseTriggerRecord(TriggerRecordPtr triggerPtr, Uint32 triggerId)
{
new (triggerPtr.p) TriggerRecord();
triggerPtr.p->triggerState = TriggerRecord::TS_NOT_DEFINED;
- triggerPtr.p->triggerId = RNIL;
+ triggerPtr.p->triggerId = triggerId;
triggerPtr.p->tableId = RNIL;
triggerPtr.p->attributeMask.clear();
triggerPtr.p->indexId = RNIL;
+ triggerPtr.p->m_obj_ptr_i = RNIL;
}
Uint32 Dbdict::getFsConnRecord()
@@ -2438,40 +2415,81 @@ Uint32 Dbdict::getFreeObjId(bool both)
return RNIL;
}
-Uint32 Dbdict::getFreeTableRecord()
+bool Dbdict::seizeTableRecord(TableRecordPtr& tablePtr, Uint32& schemaFileId)
{
- Uint32 i = getFreeObjId();
- if (i == RNIL) {
+ if (schemaFileId == RNIL)
+ {
jam();
- return RNIL;
+ schemaFileId = getFreeObjId();
+ }
+ if (schemaFileId == RNIL)
+ {
+ jam();
+ return false;
}
- if (i >= c_noOfMetaTables) {
+ if (schemaFileId >= c_noOfMetaTables)
+ {
jam();
- return RNIL;
+ return false;
}
- TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, i);
- initialiseTableRecord(tablePtr);
- return i;
+ c_tableRecordPool_.seize(tablePtr);
+ if (tablePtr.isNull())
+ {
+ jam();
+ return false;
+ }
+ initialiseTableRecord(tablePtr, schemaFileId);
+ return true;
}
Uint32 Dbdict::getFreeTriggerRecord()
{
- const Uint32 size = c_triggerRecordPool.getSize();
+ const Uint32 size = c_triggerRecordPool_.getSize();
TriggerRecordPtr triggerPtr;
- for (triggerPtr.i = 0; triggerPtr.i < size; triggerPtr.i++) {
+ for (Uint32 id = 0; id < size; id++) {
jam();
- c_triggerRecordPool.getPtr(triggerPtr);
- if (triggerPtr.p->triggerState == TriggerRecord::TS_NOT_DEFINED) {
+ bool ok = find_object(triggerPtr, id);
+ if (!ok)
+ {
jam();
- initialiseTriggerRecord(triggerPtr);
- return triggerPtr.i;
+ return id;
}
}
return RNIL;
}
+bool Dbdict::seizeTriggerRecord(TriggerRecordPtr& triggerPtr, Uint32 triggerId)
+{
+ if (triggerId == RNIL)
+ {
+ triggerId = getFreeTriggerRecord();
+ }
+ else
+ {
+ TriggerRecordPtr ptr;
+ bool ok = find_object(ptr, triggerId);
+ if (ok)
+ { // triggerId already in use
+ jam();
+ return false;
+ }
+ }
+ if (triggerId == RNIL)
+ {
+ jam();
+ return false;
+ }
+ c_triggerRecordPool_.seize(triggerPtr);
+ if (triggerPtr.isNull())
+ {
+ jam();
+ return false;
+ }
+ initialiseTriggerRecord(triggerPtr, triggerId);
+ return true;
+}
+
Uint32
Dbdict::check_read_obj(Uint32 objId, Uint32 transId)
{
@@ -2654,9 +2672,9 @@ void Dbdict::execREAD_CONFIG_REQ(Signal*
c_nodes.setSize(MAX_NDB_NODES);
c_pageRecordArray.setSize(ZNUMBER_OF_PAGES);
c_schemaPageRecordArray.setSize(2 * NDB_SF_MAX_PAGES);
- c_tableRecordPool.setSize(c_noOfMetaTables);
+ c_tableRecordPool_.setSize(c_noOfMetaTables);
g_key_descriptor_pool.setSize(c_noOfMetaTables);
- c_triggerRecordPool.setSize(c_maxNoOfTriggers);
+ c_triggerRecordPool_.setSize(c_maxNoOfTriggers);
Record_info ri;
OpSectionBuffer::createRecordInfo(ri, RT_DBDICT_OP_SECTION_BUFFER);
@@ -2933,9 +2951,9 @@ Dbdict::initSchemaFile_conf(Signal* sign
}
void
-Dbdict::activateIndexes(Signal* signal, Uint32 i)
+Dbdict::activateIndexes(Signal* signal, Uint32 id)
{
- if (i == 0)
+ if (id == 0)
D("activateIndexes start");
Uint32 requestFlags = 0;
@@ -2961,12 +2979,16 @@ Dbdict::activateIndexes(Signal* signal,
}
TableRecordPtr indexPtr;
- indexPtr.i = i;
- for (; indexPtr.i < c_noOfMetaTables; indexPtr.i++)
+ for (; id < c_noOfMetaTables; id++)
{
- c_tableRecordPool.getPtr(indexPtr);
+ bool ok = find_object(indexPtr, id);
+ if (!ok)
+ {
+ jam();
+ continue;
+ }
- if (check_read_obj(indexPtr.i))
+ if (check_read_obj(id))
{
continue;
}
@@ -2984,7 +3006,7 @@ Dbdict::activateIndexes(Signal* signal,
}
// wl3600_todo use simple schema trans when implemented
- D("activateIndexes i=" << indexPtr.i);
+ D("activateIndexes id=" << id);
TxHandlePtr tx_ptr;
seizeTxHandle(tx_ptr);
@@ -3026,8 +3048,10 @@ Dbdict::activateIndex_fromBeginTrans(Sig
ndbrequire(!tx_ptr.isNull());
TableRecordPtr indexPtr;
- indexPtr.i = tx_ptr.p->m_userData;
- c_tableRecordPool.getPtr(indexPtr);
+ c_tableRecordPool_.getPtr(indexPtr, tx_ptr.p->m_userData);
+ ndbrequire(!indexPtr.isNull());
+ DictObjectPtr index_obj_ptr;
+ c_obj_pool.getPtr(index_obj_ptr, indexPtr.p->m_obj_ptr_i);
AlterIndxReq* req = (AlterIndxReq*)signal->getDataPtrSend();
@@ -3040,7 +3064,7 @@ Dbdict::activateIndex_fromBeginTrans(Sig
req->transId = tx_ptr.p->m_transId;
req->transKey = tx_ptr.p->m_transKey;
req->requestInfo = requestInfo;
- req->indexId = indexPtr.i;
+ req->indexId = index_obj_ptr.p->m_id;
req->indexVersion = indexPtr.p->tableVersion;
Callback c = {
@@ -3091,13 +3115,13 @@ Dbdict::activateIndex_fromEndTrans(Signa
ndbrequire(!tx_ptr.isNull());
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, tx_ptr.p->m_userData);
+ c_tableRecordPool_.getPtr(indexPtr, tx_ptr.p->m_userData);
+ DictObjectPtr index_obj_ptr;
+ c_obj_pool.getPtr(index_obj_ptr, indexPtr.p->m_obj_ptr_i);
char indexName[MAX_TAB_NAME_SIZE];
{
- DictObjectPtr obj_ptr;
- c_obj_pool.getPtr(obj_ptr, indexPtr.p->m_obj_ptr_i);
- LocalRope name(c_rope_pool, obj_ptr.p->m_name);
+ LocalRope name(c_rope_pool, index_obj_ptr.p->m_name);
name.copy(indexName);
}
@@ -3106,38 +3130,43 @@ Dbdict::activateIndex_fromEndTrans(Signa
{
jam();
infoEvent("DICT: activate index %u done (%s)",
- indexPtr.i, indexName);
+ index_obj_ptr.p->m_id, indexName);
}
else
{
jam();
warningEvent("DICT: activate index %u error: code=%u line=%u node=%u (%s)",
- indexPtr.i,
+ index_obj_ptr.p->m_id,
error.errorCode, error.errorLine, error.errorNodeId,
indexName);
}
+ Uint32 id = index_obj_ptr.p->m_id;
releaseTxHandle(tx_ptr);
- activateIndexes(signal, indexPtr.i + 1);
+ activateIndexes(signal, id + 1);
}
void
-Dbdict::rebuildIndexes(Signal* signal, Uint32 i)
+Dbdict::rebuildIndexes(Signal* signal, Uint32 id)
{
- if (i == 0)
+ if (id == 0)
D("rebuildIndexes start");
TableRecordPtr indexPtr;
- indexPtr.i = i;
- for (; indexPtr.i < c_noOfMetaTables; indexPtr.i++) {
- c_tableRecordPool.getPtr(indexPtr);
- if (check_read_obj(indexPtr.i))
+ for (; id < c_noOfMetaTables; id++) {
+ bool ok = find_object(indexPtr, id);
+ if (!ok)
+ {
+ jam();
+ continue;
+ }
+ if (check_read_obj(id))
continue;
if (!indexPtr.p->isIndex())
continue;
// wl3600_todo use simple schema trans when implemented
- D("rebuildIndexes i=" << indexPtr.i);
+ D("rebuildIndexes id=" << id);
TxHandlePtr tx_ptr;
seizeTxHandle(tx_ptr);
@@ -3176,8 +3205,9 @@ Dbdict::rebuildIndex_fromBeginTrans(Sign
ndbrequire(!tx_ptr.isNull());
TableRecordPtr indexPtr;
- indexPtr.i = tx_ptr.p->m_userData;
- c_tableRecordPool.getPtr(indexPtr);
+ c_tableRecordPool_.getPtr(indexPtr, tx_ptr.p->m_userData);
+ DictObjectPtr index_obj_ptr;
+ c_obj_pool.getPtr(index_obj_ptr,indexPtr.p->m_obj_ptr_i);
BuildIndxReq* req = (BuildIndxReq*)signal->getDataPtrSend();
@@ -3193,7 +3223,7 @@ Dbdict::rebuildIndex_fromBeginTrans(Sign
req->buildId = 0;
req->buildKey = 0;
req->tableId = indexPtr.p->primaryTableId;
- req->indexId = indexPtr.i;
+ req->indexId = index_obj_ptr.p->m_id;
req->indexType = indexPtr.p->tableType;
req->parallelism = 16;
@@ -3245,7 +3275,7 @@ Dbdict::rebuildIndex_fromEndTrans(Signal
ndbrequire(!tx_ptr.isNull());
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, tx_ptr.p->m_userData);
+ c_tableRecordPool_.getPtr(indexPtr, tx_ptr.p->m_userData);
const char* actionName;
{
@@ -3254,10 +3284,11 @@ Dbdict::rebuildIndex_fromEndTrans(Signal
actionName = !noBuild ? "rebuild" : "online";
}
+ DictObjectPtr obj_ptr;
+ c_obj_pool.getPtr(obj_ptr, indexPtr.p->m_obj_ptr_i);
+
char indexName[MAX_TAB_NAME_SIZE];
{
- DictObjectPtr obj_ptr;
- c_obj_pool.getPtr(obj_ptr, indexPtr.p->m_obj_ptr_i);
LocalRope name(c_rope_pool, obj_ptr.p->m_name);
name.copy(indexName);
}
@@ -3267,20 +3298,20 @@ Dbdict::rebuildIndex_fromEndTrans(Signal
jam();
infoEvent(
"DICT: %s index %u done (%s)",
- actionName, indexPtr.i, indexName);
+ actionName, obj_ptr.p->m_id, indexName);
} else {
jam();
warningEvent(
"DICT: %s index %u error: code=%u line=%u node=%u (%s)",
actionName,
- indexPtr.i, error.errorCode, error.errorLine, error.errorNodeId,
+ obj_ptr.p->m_id, error.errorCode, error.errorLine, error.errorNodeId,
indexName);
}
- Uint32 i = tx_ptr.p->m_userData;
+ Uint32 id = obj_ptr.p->m_id;
releaseTxHandle(tx_ptr);
- rebuildIndexes(signal, i + 1);
+ rebuildIndexes(signal, id + 1);
}
/* **************************************************************** */
@@ -4693,13 +4724,13 @@ Dbdict::release_object(Uint32 obj_ptr_i,
LocalRope name(c_rope_pool, obj_name);
name.erase();
+jam();
c_obj_name_hash.remove(ptr);
- if (!DictTabInfo::isTrigger(obj_ptr_p->m_type))
- {
- jam();
- c_obj_id_hash.remove(ptr);
- }
+jam();
+ c_obj_id_hash.remove(ptr);
+jam();
c_obj_pool.release(ptr);
+jam();
}
void
@@ -4774,19 +4805,16 @@ void Dbdict::handleTabInfoInit(Signal *
}
TableRecordPtr tablePtr;
+ Uint32 schemaFileId;
switch (parseP->requestType) {
case DictTabInfo::CreateTableFromAPI: {
jam();
}
case DictTabInfo::AlterTableFromAPI:{
jam();
- tablePtr.i = getFreeTableRecord();
- /* ---------------------------------------------------------------- */
- // Check if no free tables existed.
- /* ---------------------------------------------------------------- */
- tabRequire(tablePtr.i != RNIL, CreateTableRef::NoMoreTableRecords);
-
- c_tableRecordPool.getPtr(tablePtr);
+ schemaFileId = RNIL;
+ bool ok = seizeTableRecord(tablePtr,schemaFileId);
+ tabRequire(ok, CreateTableRef::NoMoreTableRecords);
break;
}
case DictTabInfo::AddTableFromDict:
@@ -4796,20 +4824,16 @@ void Dbdict::handleTabInfoInit(Signal *
/* ---------------------------------------------------------------- */
// Get table id and check that table doesn't already exist
/* ---------------------------------------------------------------- */
- tablePtr.i = c_tableDesc.TableId;
-
if (parseP->requestType == DictTabInfo::ReadTableFromDiskSR) {
- ndbrequire(tablePtr.i == c_restartRecord.activeTable);
+ ndbrequire(c_tableDesc.TableId == c_restartRecord.activeTable);
}//if
if (parseP->requestType == DictTabInfo::GetTabInfoConf) {
- ndbrequire(tablePtr.i == c_restartRecord.activeTable);
+ ndbrequire(c_tableDesc.TableId == c_restartRecord.activeTable);
}//if
- c_tableRecordPool.getPtr(tablePtr);
-
- //Uint32 oldTableVersion = tablePtr.p->tableVersion;
- initialiseTableRecord(tablePtr);
-
+ schemaFileId = c_tableDesc.TableId;
+ bool ok = seizeTableRecord(tablePtr,schemaFileId);
+ ndbrequire(ok); // Already exists or out of memory
/* ---------------------------------------------------------------- */
// Set table version
/* ---------------------------------------------------------------- */
@@ -4835,7 +4859,7 @@ void Dbdict::handleTabInfoInit(Signal *
DictObjectPtr obj_ptr;
ndbrequire(c_obj_pool.seize(obj_ptr));
new (obj_ptr.p) DictObject;
- obj_ptr.p->m_id = tablePtr.i;
+ obj_ptr.p->m_id = schemaFileId;
obj_ptr.p->m_type = c_tableDesc.TableType;
obj_ptr.p->m_name = tablePtr.p->tableName;
obj_ptr.p->m_ref_count = 0;
@@ -4847,11 +4871,11 @@ void Dbdict::handleTabInfoInit(Signal *
{
g_eventLogger->info("Dbdict: %u: create name=%s,id=%u,obj_ptr_i=%d",__LINE__,
c_tableDesc.TableName,
- tablePtr.i, tablePtr.p->m_obj_ptr_i);
+ schemaFileId, tablePtr.p->m_obj_ptr_i);
}
send_event(signal, trans_ptr,
NDB_LE_CreateSchemaObject,
- tablePtr.i,
+ schemaFileId,
tablePtr.p->tableVersion,
c_tableDesc.TableType);
}
@@ -4995,12 +5019,12 @@ void Dbdict::handleTabInfoInit(Signal *
ndbrequire(c_tableDesc.UpdateTriggerId != RNIL);
ndbrequire(c_tableDesc.DeleteTriggerId != RNIL);
ndbout_c("table: %u UPGRADE saving (%u/%u/%u)",
- tablePtr.i,
+ schemaFileId,
c_tableDesc.InsertTriggerId,
c_tableDesc.UpdateTriggerId,
c_tableDesc.DeleteTriggerId);
infoEvent("table: %u UPGRADE saving (%u/%u/%u)",
- tablePtr.i,
+ schemaFileId,
c_tableDesc.InsertTriggerId,
c_tableDesc.UpdateTriggerId,
c_tableDesc.DeleteTriggerId);
@@ -5059,66 +5083,78 @@ Dbdict::upgrade_seizeTrigger(TableRecord
* The insert trigger will be "main" trigger so
* it does not need any special treatment
*/
- const Uint32 size = c_triggerRecordPool.getSize();
+ const Uint32 size = c_triggerRecordPool_.getSize();
ndbrequire(updateTriggerId == RNIL || updateTriggerId < size);
ndbrequire(deleteTriggerId == RNIL || deleteTriggerId < size);
+ DictObjectPtr tab_obj_ptr;
+ c_obj_pool.getPtr(tab_obj_ptr, tabPtr.p->m_obj_ptr_i);
+
TriggerRecordPtr triggerPtr;
if (updateTriggerId != RNIL)
{
jam();
- c_triggerRecordPool.getPtr(triggerPtr, updateTriggerId);
- if (triggerPtr.p->triggerState == TriggerRecord::TS_NOT_DEFINED)
+ bool ok = find_object(triggerPtr, updateTriggerId);
+ if (!ok)
{
jam();
- initialiseTriggerRecord(triggerPtr);
+ bool ok = seizeTriggerRecord(triggerPtr, updateTriggerId);
+ if (!ok)
+ {
+ jam();
+ ndbrequire(ok);
+ }
triggerPtr.p->triggerState = TriggerRecord::TS_FAKE_UPGRADE;
- triggerPtr.p->triggerId = triggerPtr.i;
triggerPtr.p->tableId = tabPtr.p->primaryTableId;
- triggerPtr.p->indexId = tabPtr.i;
+ triggerPtr.p->indexId = tab_obj_ptr.p->m_id;
TriggerInfo::packTriggerInfo(triggerPtr.p->triggerInfo,
g_hashIndexTriggerTmpl[0].triggerInfo);
char buf[256];
BaseString::snprintf(buf, sizeof(buf),
- "UPG_UPD_NDB$INDEX_%u_UI", tabPtr.i);
+ "UPG_UPD_NDB$INDEX_%u_UI", tab_obj_ptr.p->m_id);
{
LocalRope name(c_rope_pool, triggerPtr.p->triggerName);
name.assign(buf);
}
DictObjectPtr obj_ptr;
- bool ok = c_obj_pool.seize(obj_ptr);
+ ok = c_obj_pool.seize(obj_ptr);
ndbrequire(ok);
new (obj_ptr.p) DictObject();
obj_ptr.p->m_name = triggerPtr.p->triggerName;
obj_ptr.p->m_ref_count = 0;
- triggerPtr.p->m_obj_ptr_i = obj_ptr.i;
obj_ptr.p->m_id = triggerPtr.p->triggerId;
obj_ptr.p->m_type =TriggerInfo::getTriggerType(triggerPtr.p->triggerInfo);
+ link_object(obj_ptr, triggerPtr);
c_obj_name_hash.add(obj_ptr);
+ c_obj_id_hash.add(obj_ptr);
}
}
if (deleteTriggerId != RNIL)
{
jam();
- c_triggerRecordPool.getPtr(triggerPtr, deleteTriggerId);
- if (triggerPtr.p->triggerState == TriggerRecord::TS_NOT_DEFINED)
+ bool ok = find_object(triggerPtr, deleteTriggerId); // TODO: msundell seizeTriggerRecord
+ if (!ok)
{
jam();
- initialiseTriggerRecord(triggerPtr);
+ bool ok = seizeTriggerRecord(triggerPtr, deleteTriggerId);
+ if (!ok)
+ {
+ jam();
+ ndbrequire(ok);
+ }
triggerPtr.p->triggerState = TriggerRecord::TS_FAKE_UPGRADE;
- triggerPtr.p->triggerId = triggerPtr.i;
triggerPtr.p->tableId = tabPtr.p->primaryTableId;
- triggerPtr.p->indexId = tabPtr.i;
+ triggerPtr.p->indexId = tab_obj_ptr.p->m_id;
TriggerInfo::packTriggerInfo(triggerPtr.p->triggerInfo,
g_hashIndexTriggerTmpl[0].triggerInfo);
char buf[256];
BaseString::snprintf(buf, sizeof(buf),
- "UPG_DEL_NDB$INDEX_%u_UI", tabPtr.i);
+ "UPG_DEL_NDB$INDEX_%u_UI", tab_obj_ptr.p->m_id);
{
LocalRope name(c_rope_pool, triggerPtr.p->triggerName);
@@ -5126,17 +5162,18 @@ Dbdict::upgrade_seizeTrigger(TableRecord
}
DictObjectPtr obj_ptr;
- bool ok = c_obj_pool.seize(obj_ptr);
+ ok = c_obj_pool.seize(obj_ptr);
ndbrequire(ok);
new (obj_ptr.p) DictObject();
obj_ptr.p->m_name = triggerPtr.p->triggerName;
obj_ptr.p->m_ref_count = 0;
- triggerPtr.p->m_obj_ptr_i = obj_ptr.i;
obj_ptr.p->m_id = triggerPtr.p->triggerId;
obj_ptr.p->m_type =TriggerInfo::getTriggerType(triggerPtr.p->triggerInfo);
+ link_object(obj_ptr, triggerPtr);
c_obj_name_hash.add(obj_ptr);
+ c_obj_id_hash.add(obj_ptr);
}
}
}
@@ -5787,20 +5824,20 @@ Dbdict::createTable_parse(Signal* signal
TableRecordPtr tabPtr = parseRecord.tablePtr;
// link operation to object seized in handleTabInfoInit
+ DictObjectPtr obj_ptr;
{
- DictObjectPtr obj_ptr;
Uint32 obj_ptr_i = tabPtr.p->m_obj_ptr_i;
bool ok = findDictObject(op_ptr, obj_ptr, obj_ptr_i);
ndbrequire(ok);
}
{
- Uint32 version = getTableEntry(tabPtr.i)->m_tableVersion;
+ Uint32 version = getTableEntry(obj_ptr.p->m_id)->m_tableVersion;
tabPtr.p->tableVersion = create_obj_inc_schema_version(version);
}
// fill in table id and version
- impl_req->tableId = tabPtr.i;
+ impl_req->tableId = obj_ptr.p->m_id;
impl_req->tableVersion = tabPtr.p->tableVersion;
if (ERROR_INSERTED(6202) ||
@@ -5939,7 +5976,13 @@ Dbdict::createTable_parse(Signal* signal
}
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, tableId);
+ bool ok = find_object(tabPtr, tableId);
+ if (!ok)
+ {
+ jam();
+ setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+ return;
+ }
tabPtr.p->packedSize = tabInfoPtr.sz;
// wl3600_todo verify version on slave
tabPtr.p->tableVersion = tableVersion;
@@ -6032,7 +6075,7 @@ Dbdict::createTable_prepare(Signal* sign
Uint32 tableId = createTabPtr.p->m_request.tableId;
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, tableId);
+ bool ok = find_object(tabPtr, tableId);
Callback cb;
cb.m_callbackData = op_ptr.p->op_key;
@@ -6048,6 +6091,7 @@ Dbdict::createTable_prepare(Signal* sign
return;
}
+ ndbrequire(ok);
bool savetodisk = !(tabPtr.p->m_bits & TableRecord::TR_Temporary);
if (savetodisk)
{
@@ -6111,7 +6155,8 @@ Dbdict::createTab_local(Signal* signal,
createTabPtr.p->m_callback = * c;
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, createTabPtr.p->m_request.tableId);
+ bool ok = find_object(tabPtr, createTabPtr.p->m_request.tableId);
+ ndbrequire(ok);
/**
* Start by createing table in LQH
@@ -6141,7 +6186,7 @@ Dbdict::createTab_local(Signal* signal,
* Create KeyDescriptor
*/
{
- KeyDescriptor* desc= g_key_descriptor_pool.getPtr(tabPtr.i);
+ KeyDescriptor* desc= g_key_descriptor_pool.getPtr(createTabPtr.p->m_request.tableId);
new (desc) KeyDescriptor();
if (tabPtr.p->primaryTableId == RNIL)
@@ -6225,7 +6270,8 @@ Dbdict::execCREATE_TAB_CONF(Signal* sign
createTabPtr.p->m_lqhFragPtr = conf->lqhConnectPtr;
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, createTabPtr.p->m_request.tableId);
+ bool ok = find_object(tabPtr, createTabPtr.p->m_request.tableId);
+ ndbrequire(ok);
sendLQHADDATTRREQ(signal, op_ptr, tabPtr.p->m_attributes.firstItem);
}
@@ -6240,7 +6286,8 @@ Dbdict::sendLQHADDATTRREQ(Signal* signal
getOpRec(op_ptr, createTabPtr);
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, createTabPtr.p->m_request.tableId);
+ bool ok = find_object(tabPtr, createTabPtr.p->m_request.tableId);
+ ndbrequire(ok);
const bool isHashIndex = tabPtr.p->isHashIndex();
@@ -6368,8 +6415,8 @@ Dbdict::createTab_dih(Signal* signal, Sc
D("createTab_dih" << *op_ptr.p);
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, createTabPtr.p->m_request.tableId);
-
+ bool ok = find_object(tabPtr, createTabPtr.p->m_request.tableId);
+ ndbrequire(ok);
/**
* NOTE: use array access here...
@@ -6380,7 +6427,7 @@ Dbdict::createTab_dih(Signal* signal, Sc
DiAddTabReq * req = (DiAddTabReq*)signal->getDataPtrSend();
req->connectPtr = op_ptr.p->op_key;
- req->tableId = tabPtr.i;
+ req->tableId = createTabPtr.p->m_request.tableId;
req->fragType = tabPtr.p->fragmentType;
req->kValue = tabPtr.p->kValue;
req->noOfReplicas = 0;
@@ -6480,7 +6527,8 @@ Dbdict::execADD_FRAGREQ(Signal* signal)
TableRecordPtr tabPtr;
if (AlterTableReq::getAddFragFlag(changeMask))
{
- c_tableRecordPool.getPtr(tabPtr, tableId);
+ bool ok = find_object(tabPtr, tableId);
+ ndbrequire(ok);
if (DictTabInfo::isTable(tabPtr.p->tableType))
{
jam();
@@ -6507,7 +6555,8 @@ Dbdict::execADD_FRAGREQ(Signal* signal)
findSchemaOp(op_ptr, createTabPtr, senderData);
ndbrequire(!op_ptr.isNull());
createTabPtr.p->m_dihAddFragPtr = dihPtr;
- c_tableRecordPool.getPtr(tabPtr, tableId);
+ bool ok = find_object(tabPtr, tableId);
+ ndbrequire(ok);
}
#if 0
@@ -6575,7 +6624,8 @@ Dbdict::execLQHFRAGCONF(Signal * signal)
jam();
SchemaOpPtr op_ptr;
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, tableId);
+ bool ok = find_object(tabPtr, tableId);
+ ndbrequire(ok);
if (DictTabInfo::isTable(tabPtr.p->tableType))
{
AlterTableRecPtr alterTabPtr;
@@ -6628,7 +6678,8 @@ Dbdict::execLQHFRAGREF(Signal * signal)
jam();
SchemaOpPtr op_ptr;
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, tableId);
+ bool ok = find_object(tabPtr, tableId);
+ ndbrequire(ok);
if (DictTabInfo::isTable(tabPtr.p->tableType))
{
jam();
@@ -6716,13 +6767,14 @@ Dbdict::execTAB_COMMITCONF(Signal* signa
//const CreateTabReq* impl_req = &createTabPtr.p->m_request;
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, createTabPtr.p->m_request.tableId);
+ bool ok = find_object(tabPtr, createTabPtr.p->m_request.tableId);
+ ndbrequire(ok);
if (refToBlock(signal->getSendersBlockRef()) == DBLQH) {
jam();
// prepare table in DBTC
TcSchVerReq * req = (TcSchVerReq*)signal->getDataPtr();
- req->tableId = tabPtr.i;
+ req->tableId = createTabPtr.p->m_request.tableId;
req->tableVersion = tabPtr.p->tableVersion;
req->tableLogged = (Uint32)!!(tabPtr.p->m_bits & TableRecord::TR_Logged);
req->senderRef = reference();
@@ -6736,7 +6788,8 @@ Dbdict::execTAB_COMMITCONF(Signal* signa
{
jam();
TableRecordPtr basePtr;
- c_tableRecordPool.getPtr(basePtr, tabPtr.p->primaryTableId);
+ bool ok = find_object(basePtr, tabPtr.p->primaryTableId);
+ ndbrequire(ok);
req->userDefinedPartition = (basePtr.p->fragmentType == DictTabInfo::UserDefined);
}
@@ -6750,7 +6803,7 @@ Dbdict::execTAB_COMMITCONF(Signal* signa
// commit table in DBTC
signal->theData[0] = op_ptr.p->op_key;
signal->theData[1] = reference();
- signal->theData[2] = tabPtr.i;
+ signal->theData[2] = createTabPtr.p->m_request.tableId;
sendSignal(DBTC_REF, GSN_TAB_COMMITREQ, signal, 3, JBB);
return;
@@ -6815,7 +6868,8 @@ Dbdict::createTable_commit(Signal* signa
Uint32 tableId = createTabPtr.p->m_request.tableId;
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, tableId);
+ bool ok = find_object(tabPtr, tableId);
+ ndbrequire(ok);
D("createTable_commit" << *op_ptr.p);
@@ -6827,9 +6881,10 @@ Dbdict::createTable_commit(Signal* signa
if (DictTabInfo::isIndex(tabPtr.p->tableType))
{
TableRecordPtr basePtr;
- c_tableRecordPool.getPtr(basePtr, tabPtr.p->primaryTableId);
+ bool ok = find_object(basePtr, tabPtr.p->primaryTableId);
+ ndbrequire(ok);
- LocalTableRecord_list list(c_tableRecordPool, basePtr.p->m_indexes);
+ LocalTableRecord_list list(c_tableRecordPool_, basePtr.p->m_indexes);
list.add(tabPtr);
}
}
@@ -6875,7 +6930,8 @@ Dbdict::createTab_alterComplete(Signal*
const CreateTabReq* impl_req = &createTabPtr.p->m_request;
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, impl_req->tableId);
+ bool ok = find_object(tabPtr, impl_req->tableId);
+ ndbrequire(ok);
D("createTab_alterComplete" << *op_ptr.p);
@@ -6937,13 +6993,17 @@ Dbdict::createTable_abortParse(Signal* s
}
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, tableId);
+ bool ok = find_object(tabPtr, tableId);
// any link was to a new object
if (hasDictObject(op_ptr)) {
jam();
unlinkDictObject(op_ptr);
- releaseTableObject(tableId, true);
+ if (ok)
+ {
+ jam();
+ releaseTableObject(tabPtr.i, true);
+ }
}
} while (0);
@@ -6962,13 +7022,14 @@ Dbdict::createTable_abortPrepare(Signal*
D("createTable_abortPrepare" << *op_ptr.p);
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, impl_req->tableId);
+ bool ok = find_object(tabPtr, impl_req->tableId);
+ ndbrequire(ok);
// create drop table operation wl3600_todo must pre-allocate
SchemaOpPtr oplnk_ptr;
DropTableRecPtr dropTabPtr;
- bool ok = seizeLinkedSchemaOp(op_ptr, oplnk_ptr, dropTabPtr);
+ ok = seizeLinkedSchemaOp(op_ptr, oplnk_ptr, dropTabPtr);
ndbrequire(ok);
DropTabReq* aux_impl_req = &dropTabPtr.p->m_request;
@@ -7024,10 +7085,12 @@ Dbdict::createTable_abortLocalConf(Signa
Uint32 tableId = impl_req->tableId;
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, tableId);
-
- releaseTableObject(tableId);
-
+ bool ok = find_object(tablePtr, tableId);
+ if (ok)
+ {
+ jam();
+ releaseTableObject(tablePtr.i);
+ }
createTabPtr.p->m_abortPrepareDone = true;
sendTransConf(signal, op_ptr);
}
@@ -7069,10 +7132,10 @@ void Dbdict::execCREATE_TABLE_REF(Signal
handleDictRef(signal, ref);
}
-void Dbdict::releaseTableObject(Uint32 tableId, bool removeFromHash)
+void Dbdict::releaseTableObject(Uint32 table_ptr_i, bool removeFromHash)
{
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, tableId);
+ c_tableRecordPool_.getPtr(tablePtr, table_ptr_i);
if (removeFromHash)
{
jam();
@@ -7124,9 +7187,12 @@ void Dbdict::releaseTableObject(Uint32 t
{
jam();
TriggerRecordPtr triggerPtr;
- c_triggerRecordPool.getPtr(triggerPtr, triggerId);
- triggerPtr.p->triggerState = TriggerRecord::TS_NOT_DEFINED;
- release_object(triggerPtr.p->m_obj_ptr_i);
+ bool ok = find_object(triggerPtr, triggerId);
+ if (ok)
+ {
+ release_object(triggerPtr.p->m_obj_ptr_i);
+ c_triggerRecordPool_.release(triggerPtr);
+ }
}
triggerId = tablePtr.p->m_upgrade_trigger_handling.deleteTriggerId;
@@ -7134,13 +7200,16 @@ void Dbdict::releaseTableObject(Uint32 t
{
jam();
TriggerRecordPtr triggerPtr;
- c_triggerRecordPool.getPtr(triggerPtr, triggerId);
- triggerPtr.p->triggerState = TriggerRecord::TS_NOT_DEFINED;
- release_object(triggerPtr.p->m_obj_ptr_i);
+ bool ok = find_object(triggerPtr, triggerId);
+ if (ok)
+ {
+ release_object(triggerPtr.p->m_obj_ptr_i);
+ c_triggerRecordPool_.release(triggerPtr);
+ }
}
}
}
-
+ c_tableRecordPool_.release(tablePtr);
}//releaseTableObject()
// CreateTable: END
@@ -7252,6 +7321,7 @@ Dbdict::dropTable_parse(Signal* signal,
getOpRec(op_ptr, dropTabPtr);
DropTabReq* impl_req = &dropTabPtr.p->m_request;
Uint32 tableId = impl_req->tableId;
+ Uint32 err;
TableRecordPtr tablePtr;
if (!(tableId < c_noOfMetaTables)) {
@@ -7259,7 +7329,23 @@ Dbdict::dropTable_parse(Signal* signal,
setError(error, DropTableRef::NoSuchTable, __LINE__);
return;
}
- c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+
+ err = check_read_obj(impl_req->tableId, trans_ptr.p->m_transId);
+ if (err)
+ {
+ jam();
+ setError(error, err, __LINE__);
+ return;
+ }
+
+ bool ok = find_object(tablePtr, impl_req->tableId);
+ if (!ok)
+ {
+ jam();
+ setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+ return;
+ }
+
// check version first (api will retry)
if (tablePtr.p->tableVersion != impl_req->tableVersion) {
@@ -7275,7 +7361,7 @@ Dbdict::dropTable_parse(Signal* signal,
return;
}
- if (check_write_obj(tablePtr.i,
+ if (check_write_obj(impl_req->tableId,
trans_ptr.p->m_transId,
SchemaFile::SF_DROP, error))
{
@@ -7301,7 +7387,7 @@ Dbdict::dropTable_parse(Signal* signal,
SchemaFile::TableEntry te; te.init();
te.m_tableState = SchemaFile::SF_DROP;
te.m_transId = trans_ptr.p->m_transId;
- Uint32 err = trans_log_schema_op(op_ptr, tableId, &te);
+ err = trans_log_schema_op(op_ptr, tableId, &te);
if (err)
{
jam();
@@ -7398,7 +7484,8 @@ Dbdict::dropTable_backup_mutex_locked(Si
const DropTabReq* impl_req = &dropTabPtr.p->m_request;
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+ bool ok = find_object(tablePtr, impl_req->tableId);
+ ndbrequire(ok);
Mutex mutex(signal, c_mutexMgr, dropTabPtr.p->m_define_backup_mutex);
mutex.unlock(); // ignore response
@@ -7426,7 +7513,8 @@ Dbdict::dropTable_commit(Signal* signal,
D("dropTable_commit" << *op_ptr.p);
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, dropTabPtr.p->m_request.tableId);
+ bool ok = find_object(tablePtr, dropTabPtr.p->m_request.tableId);
+ ndbrequire(ok);
if (tablePtr.p->m_tablespace_id != RNIL)
{
@@ -7441,22 +7529,22 @@ Dbdict::dropTable_commit(Signal* signal,
char buf[1024];
LocalRope name(c_rope_pool, tablePtr.p->tableName);
name.copy(buf);
- g_eventLogger->info("Dbdict: drop name=%s,id=%u,obj_id=%u", buf, tablePtr.i,
+ g_eventLogger->info("Dbdict: drop name=%s,id=%u,obj_id=%u", buf, dropTabPtr.p->m_request.tableId,
tablePtr.p->m_obj_ptr_i);
}
send_event(signal, trans_ptr,
NDB_LE_DropSchemaObject,
- tablePtr.i,
+ dropTabPtr.p->m_request.tableId,
tablePtr.p->tableVersion,
tablePtr.p->tableType);
if (DictTabInfo::isIndex(tablePtr.p->tableType))
{
TableRecordPtr basePtr;
- c_tableRecordPool.getPtr(basePtr, tablePtr.p->primaryTableId);
-
- LocalTableRecord_list list(c_tableRecordPool, basePtr.p->m_indexes);
+ bool ok = find_object(basePtr, tablePtr.p->primaryTableId);
+ ndbrequire(ok);
+ LocalTableRecord_list list(c_tableRecordPool_, basePtr.p->m_indexes);
list.remove(tablePtr);
}
dropTabPtr.p->m_block = 0;
@@ -7602,9 +7690,6 @@ Dbdict::dropTable_complete(Signal* signa
DropTableRecPtr dropTabPtr;
getOpRec(op_ptr, dropTabPtr);
- TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, dropTabPtr.p->m_request.tableId);
-
dropTabPtr.p->m_block = 0;
dropTabPtr.p->m_blockNo[0] = DBTC;
dropTabPtr.p->m_blockNo[1] = DBLQH; // wait usage + LCP
@@ -7631,9 +7716,6 @@ Dbdict::dropTable_complete_nextStep(Sign
*/
ndbrequire(!hasError(op_ptr.p->m_error));
- TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
-
Uint32 block = dropTabPtr.p->m_block;
Uint32 blockNo = dropTabPtr.p->m_blockNo[block];
D("dropTable_complete_nextStep" << hex << V(blockNo) << *op_ptr.p);
@@ -7716,7 +7798,12 @@ Dbdict::dropTable_complete_done(Signal*
Uint32 tableId = dropTabPtr.p->m_request.tableId;
unlinkDictObject(op_ptr);
- releaseTableObject(tableId);
+ TableRecordPtr tablePtr;
+ bool ok = find_object(tablePtr, tableId);
+ if (ok)
+ {
+ releaseTableObject(tablePtr.i);
+ }
// inform SUMA
{
@@ -7925,6 +8012,7 @@ Dbdict::alterTable_parse(Signal* signal,
AlterTableRecPtr alterTabPtr;
getOpRec(op_ptr, alterTabPtr);
AlterTabReq* impl_req = &alterTabPtr.p->m_request;
+ Uint32 err;
if (AlterTableReq::getReorgSubOp(impl_req->changeMask))
{
@@ -7948,7 +8036,14 @@ Dbdict::alterTable_parse(Signal* signal,
setError(error, AlterTableRef::NoSuchTable, __LINE__);
return;
}
- c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+
+ bool ok = find_object(tablePtr, impl_req->tableId);
+ if (!ok)
+ {
+ jam();
+ setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+ return;
+ }
if (tablePtr.p->m_read_locked)
{
@@ -7957,7 +8052,7 @@ Dbdict::alterTable_parse(Signal* signal,
return;
}
- if (check_write_obj(tablePtr.i, trans_ptr.p->m_transId,
+ if (check_write_obj(impl_req->tableId, trans_ptr.p->m_transId,
SchemaFile::SF_ALTER, error))
{
jam();
@@ -8319,7 +8414,7 @@ Dbdict::alterTable_parse(Signal* signal,
te.m_gcp = 0;
te.m_transId = trans_ptr.p->m_transId;
- Uint32 err = trans_log_schema_op(op_ptr, impl_req->tableId, &te);
+ err = trans_log_schema_op(op_ptr, impl_req->tableId, &te);
if (err)
{
jam();
@@ -8438,8 +8533,9 @@ Dbdict::alterTable_subOps(Signal* signal
jam();
TableRecordPtr tabPtr;
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(tabPtr, impl_req->tableId);
- LocalTableRecord_list list(c_tableRecordPool, tabPtr.p->m_indexes);
+ bool ok = find_object(tabPtr, impl_req->tableId);
+ ndbrequire(ok);
+ LocalTableRecord_list list(c_tableRecordPool_, tabPtr.p->m_indexes);
Uint32 ptrI = alterTabPtr.p->m_sub_add_frag_index_ptr;
if (ptrI == RNIL)
@@ -8586,7 +8682,8 @@ Dbdict::alterTable_toAlterIndex(Signal*
SchemaTransPtr trans_ptr = op_ptr.p->m_trans_ptr;
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, alterTabPtr.p->m_sub_add_frag_index_ptr);
+ c_tableRecordPool_.getPtr(indexPtr, alterTabPtr.p->m_sub_add_frag_index_ptr);
+ ndbrequire(!indexPtr.isNull());
AlterIndxReq* req = (AlterIndxReq*)signal->getDataPtrSend();
req->clientRef = reference();
@@ -8713,8 +8810,6 @@ Dbdict::alterTable_toCreateTrigger(Signa
AlterTableRecPtr alterTablePtr;
getOpRec(op_ptr, alterTablePtr);
const AlterTabReq* impl_req = &alterTablePtr.p->m_request;
- TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
const TriggerTmpl& triggerTmpl = g_reorgTriggerTmpl[0];
@@ -8776,7 +8871,8 @@ Dbdict::alterTable_toCopyData(Signal* si
getOpRec(op_ptr, alterTablePtr);
const AlterTabReq* impl_req = &alterTablePtr.p->m_request;
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+ bool ok = find_object(tablePtr, impl_req->tableId);
+ ndbrequire(ok);
CopyDataReq* req = (CopyDataReq*)signal->getDataPtrSend();
@@ -8951,7 +9047,8 @@ Dbdict::alterTable_backup_mutex_locked(S
const AlterTabReq* impl_req = &alterTabPtr.p->m_request;
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+ bool ok = find_object(tablePtr, impl_req->tableId);
+ ndbrequire(ok);
Mutex mutex(signal, c_mutexMgr, alterTabPtr.p->m_define_backup_mutex);
mutex.unlock(); // ignore response
@@ -9152,7 +9249,8 @@ Dbdict::alterTable_commit(Signal* signal
D("alterTable_commit" << *op_ptr.p);
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+ bool ok = find_object(tablePtr, impl_req->tableId);
+ ndbrequire(ok);
if (op_ptr.p->m_sections)
{
@@ -9398,7 +9496,8 @@ Dbdict::alterTable_fromCommitComplete(Si
const Uint32 tableId = impl_req->tableId;
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, tableId);
+ bool ok = find_object(tablePtr, tableId);
+ ndbrequire(ok);
// inform Suma so it can send events to any subscribers of the table
{
@@ -9783,7 +9882,7 @@ void Dbdict::execGET_TABLEDID_REQ(Signal
}
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, obj_ptr_p->m_id);
+ c_tableRecordPool_.getPtr(tablePtr, obj_ptr_p->m_object_ptr_i);
GetTableIdConf * conf = (GetTableIdConf *)req;
conf->tableId = tablePtr.p->tableId;
@@ -10112,19 +10211,19 @@ void Dbdict::sendOLD_LIST_TABLES_CONF(Si
TableRecordPtr tablePtr;
if (DictTabInfo::isTable(type) || DictTabInfo::isIndex(type)){
- c_tableRecordPool.getPtr(tablePtr, iter.curr.p->m_id);
+ c_tableRecordPool_.getPtr(tablePtr, iter.curr.p->m_object_ptr_i);
if(reqListIndexes && (reqTableId != tablePtr.p->primaryTableId))
continue;
conf->tableData[pos] = 0;
- conf->setTableId(pos, tablePtr.i); // id
+ conf->setTableId(pos, iter.curr.p->m_id); // id
conf->setTableType(pos, type); // type
// state
if(DictTabInfo::isTable(type))
{
- SchemaFile::TableEntry * te = getTableEntry(xsf, tablePtr.i);
+ SchemaFile::TableEntry * te = getTableEntry(xsf, iter.curr.p->m_id);
switch(te->m_tableState){
case SchemaFile::SF_CREATE:
jam();
@@ -10192,24 +10291,30 @@ void Dbdict::sendOLD_LIST_TABLES_CONF(Si
}
if(DictTabInfo::isTrigger(type)){
TriggerRecordPtr triggerPtr;
- c_triggerRecordPool.getPtr(triggerPtr, iter.curr.p->m_id);
-
+ bool ok = find_object(triggerPtr, iter.curr.p->m_id);
conf->tableData[pos] = 0;
- conf->setTableId(pos, triggerPtr.i);
+ conf->setTableId(pos, iter.curr.p->m_id);
conf->setTableType(pos, type);
- switch (triggerPtr.p->triggerState) {
- case TriggerRecord::TS_DEFINING:
- conf->setTableState(pos, DictTabInfo::StateBuilding);
- break;
- case TriggerRecord::TS_OFFLINE:
- conf->setTableState(pos, DictTabInfo::StateOffline);
- break;
- case TriggerRecord::TS_ONLINE:
- conf->setTableState(pos, DictTabInfo::StateOnline);
- break;
- default:
- conf->setTableState(pos, DictTabInfo::StateBroken);
- break;
+ if (!ok)
+ {
+ conf->setTableState(pos, DictTabInfo::StateBroken);
+ }
+ else
+ {
+ switch (triggerPtr.p->triggerState) {
+ case TriggerRecord::TS_DEFINING:
+ conf->setTableState(pos, DictTabInfo::StateBuilding);
+ break;
+ case TriggerRecord::TS_OFFLINE:
+ conf->setTableState(pos, DictTabInfo::StateOffline);
+ break;
+ case TriggerRecord::TS_ONLINE:
+ conf->setTableState(pos, DictTabInfo::StateOnline);
+ break;
+ default:
+ conf->setTableState(pos, DictTabInfo::StateBroken);
+ break;
+ }
}
conf->setTableStore(pos, DictTabInfo::StoreNotLogged);
pos++;
@@ -10335,18 +10440,18 @@ void Dbdict::sendLIST_TABLES_CONF(Signal
TableRecordPtr tablePtr;
if (DictTabInfo::isTable(type) || DictTabInfo::isIndex(type)){
- c_tableRecordPool.getPtr(tablePtr, iter.curr.p->m_id);
+ c_tableRecordPool_.getPtr(tablePtr, iter.curr.p->m_object_ptr_i);
if(reqListIndexes && (reqTableId != tablePtr.p->primaryTableId))
goto flush;
ltd.requestData = 0; // clear
- ltd.setTableId(tablePtr.i); // id
+ ltd.setTableId(iter.curr.p->m_id); // id
ltd.setTableType(type); // type
// state
if(DictTabInfo::isTable(type)){
- SchemaFile::TableEntry * te = getTableEntry(xsf, tablePtr.i);
+ SchemaFile::TableEntry * te = getTableEntry(xsf, iter.curr.p->m_id);
switch(te->m_tableState){
case SchemaFile::SF_CREATE:
jam();
@@ -10413,24 +10518,31 @@ void Dbdict::sendLIST_TABLES_CONF(Signal
}
if(DictTabInfo::isTrigger(type)){
TriggerRecordPtr triggerPtr;
- c_triggerRecordPool.getPtr(triggerPtr, iter.curr.p->m_id);
+ bool ok = find_object(triggerPtr, iter.curr.p->m_id);
ltd.requestData = 0;
- ltd.setTableId(triggerPtr.i);
+ ltd.setTableId(iter.curr.p->m_id);
ltd.setTableType(type);
- switch (triggerPtr.p->triggerState) {
- case TriggerRecord::TS_DEFINING:
- ltd.setTableState(DictTabInfo::StateBuilding);
- break;
- case TriggerRecord::TS_OFFLINE:
- ltd.setTableState(DictTabInfo::StateOffline);
- break;
- case TriggerRecord::TS_ONLINE:
- ltd.setTableState(DictTabInfo::StateOnline);
- break;
- default:
- ltd.setTableState(DictTabInfo::StateBroken);
- break;
+ if (!ok)
+ {
+ ltd.setTableState(DictTabInfo::StateBroken);
+ }
+ else
+ {
+ switch (triggerPtr.p->triggerState) {
+ case TriggerRecord::TS_DEFINING:
+ ltd.setTableState(DictTabInfo::StateBuilding);
+ break;
+ case TriggerRecord::TS_OFFLINE:
+ ltd.setTableState(DictTabInfo::StateOffline);
+ break;
+ case TriggerRecord::TS_ONLINE:
+ ltd.setTableState(DictTabInfo::StateOnline);
+ break;
+ default:
+ ltd.setTableState(DictTabInfo::StateBroken);
+ break;
+ }
}
ltd.setTableStore(DictTabInfo::StoreNotLogged);
}
@@ -10760,16 +10872,16 @@ Dbdict::createIndex_parse(Signal* signal
setError(error, CreateIndxRef::InvalidPrimaryTable, __LINE__);
return;
}
- c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+ bool ok = find_object(tablePtr, impl_req->tableId);
+ if (!ok || !tablePtr.p->isTable()) {
- if (!tablePtr.p->isTable()) {
jam();
setError(error, CreateIndxRef::InvalidPrimaryTable, __LINE__);
return;
}
Uint32 err;
- if ((err = check_read_obj(tablePtr.i, trans_ptr.p->m_transId)))
+ if ((err = check_read_obj(impl_req->tableId, trans_ptr.p->m_transId)))
{
jam();
setError(error, err, __LINE__);
@@ -10975,8 +11087,8 @@ Dbdict::createIndex_toCreateTable(Signal
getOpRec(op_ptr, createIndexPtr);
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, createIndexPtr.p->m_request.tableId);
- ndbrequire(tablePtr.i == tablePtr.p->tableId);
+ bool ok = find_object(tablePtr, createIndexPtr.p->m_request.tableId);
+ ndbrequire(ok);
// signal data writer
Uint32* wbuffer = &c_indexPage.word[0];
@@ -11448,21 +11560,30 @@ Dbdict::dropIndex_parse(Signal* signal,
SectionHandle& handle, ErrorInfo& error)
{
D("dropIndex_parse" << V(op_ptr.i) << *op_ptr.p);
+ jam();
SchemaTransPtr trans_ptr = op_ptr.p->m_trans_ptr;
DropIndexRecPtr dropIndexPtr;
getOpRec(op_ptr, dropIndexPtr);
DropIndxImplReq* impl_req = &dropIndexPtr.p->m_request;
- TableRecordPtr indexPtr;
if (!(impl_req->indexId < c_noOfMetaTables)) {
jam();
setError(error, DropIndxRef::IndexNotFound, __LINE__);
return;
}
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
- if (!indexPtr.p->isIndex())
+ Uint32 err = check_read_obj(impl_req->indexId, trans_ptr.p->m_transId);
+ if (err)
+ {
+ jam();
+ setError(error, err, __LINE__);
+ return;
+ }
+
+ TableRecordPtr indexPtr;
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ if (!ok || !indexPtr.p->isIndex())
{
jam();
setError(error, DropIndxRef::NotAnIndex, __LINE__);
@@ -11476,16 +11597,21 @@ Dbdict::dropIndex_parse(Signal* signal,
return;
}
- if (check_write_obj(indexPtr.i, trans_ptr.p->m_transId,
+ if (check_write_obj(impl_req->indexId, trans_ptr.p->m_transId,
SchemaFile::SF_DROP, error))
{
jam();
return;
}
- ndbrequire(indexPtr.p->primaryTableId != RNIL);
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, indexPtr.p->primaryTableId);
+ ok = find_object(tablePtr, indexPtr.p->primaryTableId);
+ if (!ok)
+ {
+ jam();
+ setError(error, CreateIndxRef::InvalidPrimaryTable, __LINE__);
+ return;
+ }
// master sets primary table, slave verifies it agrees
if (master)
@@ -11942,7 +12068,21 @@ Dbdict::alterIndex_parse(Signal* signal,
setError(error, AlterIndxRef::IndexNotFound, __LINE__);
return;
}
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+ if (check_read_obj(impl_req->indexId, trans_ptr.p->m_transId) == GetTabInfoRef::TableNotDefined)
+ {
+ jam();
+ setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+ return;
+ }
+ jam();
+
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ if (!ok)
+ {
+ jam();
+ setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+ return;
+ }
// get name for system index check later
char indexName[MAX_TAB_NAME_SIZE];
@@ -11959,7 +12099,7 @@ Dbdict::alterIndex_parse(Signal* signal,
return;
}
- if (check_write_obj(indexPtr.i, trans_ptr.p->m_transId,
+ if (check_write_obj(impl_req->indexId, trans_ptr.p->m_transId,
SchemaFile::SF_ALTER, error))
{
jam();
@@ -11995,7 +12135,8 @@ Dbdict::alterIndex_parse(Signal* signal,
ndbrequire(indexPtr.p->primaryTableId != RNIL);
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, indexPtr.p->primaryTableId);
+ ok = find_object(tablePtr, indexPtr.p->primaryTableId);
+ ndbrequire(ok); // TODO:msundell set error
// master sets primary table, participant verifies it agrees
if (master)
@@ -12122,7 +12263,9 @@ void
Dbdict::set_index_stat_frag(Signal* signal, TableRecordPtr indexPtr)
{
jam();
- const Uint32 indexId = indexPtr.i;
+ DictObjectPtr index_obj_ptr;
+ c_obj_pool.getPtr(index_obj_ptr, indexPtr.p->m_obj_ptr_i);
+ const Uint32 indexId = index_obj_ptr.p->m_id;
Uint32 err = get_fragmentation(signal, indexId);
ndbrequire(err == 0);
// format: R F { fragId node1 .. nodeR } x { F }
@@ -12133,7 +12276,7 @@ Dbdict::set_index_stat_frag(Signal* sign
ndbrequire(noOfFragments != 0 && noOfReplicas != 0);
// distribute by table and index id
- const Uint32 value = indexPtr.p->primaryTableId + indexPtr.i;
+ const Uint32 value = indexPtr.p->primaryTableId + indexId;
const Uint32 fragId = value % noOfFragments;
const Uint32 fragIndex = 2 + (1 + noOfReplicas) * fragId;
const Uint32 nodeIndex = value % noOfReplicas;
@@ -12153,8 +12296,6 @@ Dbdict::alterIndex_subOps(Signal* signal
getOpRec(op_ptr, alterIndexPtr);
const AlterIndxImplReq* impl_req = &alterIndexPtr.p->m_request;
Uint32 requestType = impl_req->requestType;
- TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
// ops to create or drop triggers
if (alterIndexPtr.p->m_sub_trigger == false)
@@ -12206,7 +12347,10 @@ Dbdict::alterIndex_subOps(Signal* signal
return true;
}
- if (indexPtr.p->isOrderedIndex() &&
+ TableRecordPtr indexPtr;
+ bool ok = find_object(indexPtr, impl_req->indexId);
+
+ if (ok && indexPtr.p->isOrderedIndex() &&
(!alterIndexPtr.p->m_sub_index_stat_dml ||
!alterIndexPtr.p->m_sub_index_stat_mon)) {
jam();
@@ -12232,7 +12376,8 @@ Dbdict::alterIndex_toCreateTrigger(Signa
getOpRec(op_ptr, alterIndexPtr);
const AlterIndxImplReq* impl_req = &alterIndexPtr.p->m_request;
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ ndbrequire(ok);
const TriggerTmpl& triggerTmpl = alterIndexPtr.p->m_triggerTmpl[0];
@@ -12326,7 +12471,8 @@ Dbdict::alterIndex_toDropTrigger(Signal*
const AlterIndxImplReq* impl_req = &alterIndexPtr.p->m_request;
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ ndbrequire(ok);
//const TriggerTmpl& triggerTmpl = alterIndexPtr.p->m_triggerTmpl[0];
@@ -12489,7 +12635,8 @@ Dbdict::alterIndex_toIndexStat(Signal* s
DictSignal::addRequestFlagsGlobal(requestInfo, op_ptr.p->m_requestInfo);
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ ndbrequire(ok);
req->clientRef = reference();
req->clientData = op_ptr.p->op_key;
@@ -12593,7 +12740,8 @@ Dbdict::alterIndex_prepare(Signal* signa
Uint32 requestType = impl_req->requestType;
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ ndbrequire(ok);
D("alterIndex_prepare" << *op_ptr.p);
@@ -12653,7 +12801,8 @@ Dbdict::alterIndex_toCreateLocal(Signal*
const AlterIndxImplReq* impl_req = &alterIndexPtr.p->m_request;
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ ndbrequire(ok);
D("alterIndex_toCreateLocal" << *op_ptr.p);
@@ -12686,9 +12835,6 @@ Dbdict::alterIndex_toDropLocal(Signal* s
getOpRec(op_ptr, alterIndexPtr);
const AlterIndxImplReq* impl_req = &alterIndexPtr.p->m_request;
- TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
-
D("alterIndex_toDropLocal" << *op_ptr.p);
DropIndxImplReq* req = (DropIndxImplReq*)signal->getDataPtrSend();
@@ -12865,7 +13011,12 @@ Dbdict::alterIndex_abortParse(Signal* si
}
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, indexId);
+ bool ok = find_object(indexPtr, indexId);
+ if (!ok)
+ {
+ jam();
+ break;
+ }
switch (requestType) {
case AlterIndxImplReq::AlterIndexOnline:
@@ -13105,20 +13256,30 @@ Dbdict::buildIndex_parse(Signal* signal,
SchemaOpPtr op_ptr,
SectionHandle& handle, ErrorInfo& error)
{
- D("buildIndex_parse");
+ D("buildIndex_parse");
+ SchemaTransPtr trans_ptr = op_ptr.p->m_trans_ptr;
BuildIndexRecPtr buildIndexPtr;
getOpRec(op_ptr, buildIndexPtr);
BuildIndxImplReq* impl_req = &buildIndexPtr.p->m_request;
+ Uint32 err;
// get index
TableRecordPtr indexPtr;
- if (!(impl_req->indexId < c_noOfMetaTables)) {
+ err = check_read_obj(impl_req->indexId, trans_ptr.p->m_transId);
+ if (err)
+ {
jam();
- setError(error, BuildIndxRef::IndexNotFound, __LINE__);
+ setError(error, err, __LINE__);
+ return;
+ }
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ if (!ok)
+ {
+ jam();
+ setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
return;
}
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
ndbrequire(indexPtr.p->primaryTableId == impl_req->tableId);
@@ -13129,7 +13290,13 @@ Dbdict::buildIndex_parse(Signal* signal,
setError(error, BuildIndxRef::IndexNotFound, __LINE__);
return;
}
- c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+ ok = find_object(tablePtr, impl_req->tableId);
+ if (!ok)
+ {
+ jam();
+ setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+ return;
+ }
// set attribute lists
getIndexAttrList(indexPtr, buildIndexPtr.p->m_indexKeyList);
@@ -13401,9 +13568,6 @@ Dbdict::buildIndex_toDropConstraint(Sign
getOpRec(op_ptr, buildIndexPtr);
const BuildIndxImplReq* impl_req = &buildIndexPtr.p->m_request;
- TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
-
const TriggerTmpl& triggerTmpl = buildIndexPtr.p->m_triggerTmpl[0];
DropTrigReq* req = (DropTrigReq*)signal->getDataPtrSend();
@@ -13488,9 +13652,6 @@ Dbdict::buildIndex_reply(Signal* signal,
D("buildIndex_reply" << V(impl_req->indexId));
- TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
-
if (!hasError(error)) {
BuildIndxConf* conf = (BuildIndxConf*)signal->getDataPtrSend();
conf->senderRef = reference();
@@ -13553,7 +13714,8 @@ Dbdict:: buildIndex_toLocalBuild(Signal*
SchemaTransPtr trans_ptr = op_ptr.p->m_trans_ptr;
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ ndbrequire(ok);
D("buildIndex_toLocalBuild");
@@ -13654,7 +13816,8 @@ Dbdict::buildIndex_toLocalOnline(Signal*
const BuildIndxImplReq* impl_req = &buildIndexPtr.p->m_request;
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ ndbrequire(ok);
D("buildIndex_toLocalOnline");
@@ -13705,7 +13868,8 @@ Dbdict::buildIndex_fromLocalOnline(Signa
const BuildIndxImplReq* impl_req = &buildIndexPtr.p->m_request;
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ ndbrequire(ok);
D("buildIndex_fromLocalOnline");
@@ -13880,20 +14044,23 @@ Dbdict::indexStat_parse(Signal* signal,
{
D("indexStat_parse");
+ SchemaTransPtr trans_ptr = op_ptr.p->m_trans_ptr;
IndexStatRecPtr indexStatPtr;
getOpRec(op_ptr, indexStatPtr);
IndexStatImplReq* impl_req = &indexStatPtr.p->m_request;
+ Uint32 err;
// get index
TableRecordPtr indexPtr;
- if (!(impl_req->indexId < c_noOfMetaTables)) {
+ err = check_read_obj(impl_req->indexId, trans_ptr.p->m_transId);
+ if (err)
+ {
jam();
- setError(error, IndexStatRef::InvalidIndex, __LINE__);
+ setError(error, err, __LINE__);
return;
}
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
-
- if (!indexPtr.p->isOrderedIndex()) {
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ if (!ok || !indexPtr.p->isOrderedIndex()) {
jam();
setError(error, IndexStatRef::InvalidIndex, __LINE__);
return;
@@ -14023,7 +14190,8 @@ Dbdict::indexStat_toIndexStat(Signal* si
DictSignal::addRequestFlagsGlobal(requestInfo, op_ptr.p->m_requestInfo);
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ ndbrequire(ok);
req->clientRef = reference();
req->clientData = op_ptr.p->op_key;
@@ -14084,9 +14252,6 @@ Dbdict::indexStat_reply(Signal* signal,
D("indexStat_reply" << V(impl_req->indexId));
- TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
-
if (!hasError(error)) {
IndexStatConf* conf = (IndexStatConf*)signal->getDataPtrSend();
conf->senderRef = reference();
@@ -14143,8 +14308,8 @@ Dbdict::indexStat_toLocalStat(Signal* si
D("indexStat_toLocalStat");
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
- ndbrequire(indexPtr.p->isOrderedIndex());
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ ndbrequire(ok && indexPtr.p->isOrderedIndex());
Callback c = {
safe_cast(&Dbdict::indexStat_fromLocalStat),
@@ -14352,7 +14517,12 @@ Dbdict::execINDEX_STAT_REP(Signal* signa
jam();
return;
}
- c_tableRecordPool.getPtr(indexPtr, rep->indexId);
+ bool ok = find_object(indexPtr, rep->indexId);
+ if (!ok)
+ {
+ jam();
+ return;
+ }
if (rep->indexVersion != 0 &&
rep->indexVersion != indexPtr.p->tableVersion) {
jam();
@@ -14370,7 +14540,7 @@ Dbdict::execINDEX_STAT_REP(Signal* signa
D("index stat: " << copyRope<MAX_TAB_NAME_SIZE>(indexPtr.p->tableName)
<< " request type:" << rep->requestType);
- infoEvent("DICT: index %u stats auto-update requested", indexPtr.i);
+ infoEvent("DICT: index %u stats auto-update requested", rep->indexId);
indexPtr.p->indexStatBgRequest = rep->requestType;
}
@@ -14400,8 +14570,8 @@ Dbdict::indexStatBg_process(Signal* sign
jam();
continue;
}
- c_tableRecordPool.getPtr(indexPtr, c_indexStatBgId);
- if (!indexPtr.p->isOrderedIndex()) {
+ bool ok = find_object(indexPtr, c_indexStatBgId);
+ if (!ok || !indexPtr.p->isOrderedIndex()) {
jam();
continue;
}
@@ -14437,15 +14607,16 @@ Dbdict::indexStatBg_fromBeginTrans(Signa
findTxHandle(tx_ptr, tx_key);
ndbrequire(!tx_ptr.isNull());
- TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, c_indexStatBgId);
-
if (ret != 0) {
jam();
indexStatBg_sendContinueB(signal);
return;
}
+ TableRecordPtr indexPtr;
+ bool ok = find_object(indexPtr, c_indexStatBgId);
+ ndbrequire(ok);
+
Callback c = {
safe_cast(&Dbdict::indexStatBg_fromIndexStat),
tx_ptr.p->tx_key
@@ -14475,13 +14646,10 @@ Dbdict::indexStatBg_fromIndexStat(Signal
findTxHandle(tx_ptr, tx_key);
ndbrequire(!tx_ptr.isNull());
- TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, c_indexStatBgId);
-
if (ret != 0) {
jam();
setError(tx_ptr.p->m_error, ret, __LINE__);
- warningEvent("DICT: index %u stats auto-update error: %d", indexPtr.i, ret);
+ warningEvent("DICT: index %u stats auto-update error: %d", c_indexStatBgId, ret);
}
Callback c = {
@@ -14506,17 +14674,18 @@ Dbdict::indexStatBg_fromEndTrans(Signal*
ndbrequire(!tx_ptr.isNull());
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, c_indexStatBgId);
+ bool ok = find_object(indexPtr, c_indexStatBgId);
if (ret != 0) {
jam();
// skip over but leave the request on
- warningEvent("DICT: index %u stats auto-update error: %d", indexPtr.i, ret);
+ warningEvent("DICT: index %u stats auto-update error: %d", c_indexStatBgId, ret);
} else {
jam();
+ ndbrequire(ok);
// mark request done
indexPtr.p->indexStatBgRequest = 0;
- infoEvent("DICT: index %u stats auto-update done", indexPtr.i);
+ infoEvent("DICT: index %u stats auto-update done", c_indexStatBgId);
}
releaseTxHandle(tx_ptr);
@@ -14701,7 +14870,8 @@ Dbdict::copyData_prepare(Signal* signal,
Uint32 tmp[MAX_ATTRIBUTES_IN_TABLE];
bool tabHasDiskCols = false;
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, impl_req->srcTableId);
+ bool ok = find_object(tabPtr, impl_req->srcTableId);
+ ndbrequire(ok);
{
LocalAttributeRecord_list alist(c_attributeRecordPool,
tabPtr.p->m_attributes);
@@ -14802,7 +14972,8 @@ Dbdict::copyData_complete(Signal* signal
Uint32 tmp[MAX_ATTRIBUTES_IN_TABLE];
bool tabHasDiskCols = false;
TableRecordPtr tabPtr;
- c_tableRecordPool.getPtr(tabPtr, impl_req->srcTableId);
+ bool ok = find_object(tabPtr, impl_req->srcTableId);
+ ndbrequire(ok);
{
LocalAttributeRecord_list alist(c_attributeRecordPool,
tabPtr.p->m_attributes);
@@ -15073,7 +15244,7 @@ Dbdict::prepareTransactionEventSysTable
ndbrequire(opj_ptr_p != 0);
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, opj_ptr_p->m_id);
+ c_tableRecordPool_.getPtr(tablePtr, opj_ptr_p->m_object_ptr_i);
ndbrequire(tablePtr.i != RNIL); // system table must exist
Uint32 tableId = tablePtr.p->tableId; /* System table */
@@ -15691,7 +15862,7 @@ void Dbdict::executeTransEventSysTable(C
ndbrequire(opj_ptr_p != 0);
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, opj_ptr_p->m_id);
+ c_tableRecordPool_.getPtr(tablePtr, opj_ptr_p->m_object_ptr_i);
ndbrequire(tablePtr.i != RNIL); // system table must exist
Uint32 noAttr = tablePtr.p->noOfAttributes;
@@ -15871,7 +16042,7 @@ void Dbdict::parseReadEventSys(Signal* s
ndbrequire(opj_ptr_p != 0);
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, opj_ptr_p->m_id);
+ c_tableRecordPool_.getPtr(tablePtr, opj_ptr_p->m_object_ptr_i);
ndbrequire(tablePtr.i != RNIL); // system table must exist
Uint32 noAttr = tablePtr.p->noOfAttributes;
@@ -15961,7 +16132,7 @@ void Dbdict::createEventUTIL_EXECUTE(Sig
}
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, obj_ptr_p->m_id);
+ c_tableRecordPool_.getPtr(tablePtr, obj_ptr_p->m_object_ptr_i);
evntRec->m_request.setTableId(tablePtr.p->tableId);
evntRec->m_request.setTableVersion(tablePtr.p->tableVersion);
@@ -17735,28 +17906,33 @@ Dbdict::createTrigger_parse(Signal* sign
impl_req->triggerId = getFreeTriggerRecord();
if (impl_req->triggerId == RNIL)
{
- jam();
- setError(error, CreateTrigRef::TooManyTriggers, __LINE__);
- return;
+ jam();
+ setError(error, CreateTrigRef::TooManyTriggers, __LINE__);
+ return;
+ }
+ bool ok = find_object(triggerPtr, impl_req->triggerId);
+ if (ok)
+ {
+ jam();
+ setError(error, CreateTrigRef::TriggerExists, __LINE__);
+ return;
}
- c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
- ndbrequire(triggerPtr.p->triggerState == TriggerRecord::TS_NOT_DEFINED);
D("master allocated triggerId " << impl_req->triggerId);
}
else
{
- if (!(impl_req->triggerId < c_triggerRecordPool.getSize()))
+ if (!(impl_req->triggerId < c_triggerRecordPool_.getSize()))
{
jam();
setError(error, CreateTrigRef::TooManyTriggers, __LINE__);
return;
}
- c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
- if (triggerPtr.p->triggerState != TriggerRecord::TS_NOT_DEFINED)
+ bool ok = find_object(triggerPtr, impl_req->triggerId);
+ if (ok)
{
- jam();
- setError(error, CreateTrigRef::TriggerExists, __LINE__);
- return;
+ jam();
+ setError(error, CreateTrigRef::TriggerExists, __LINE__);
+ return;
}
D("master forced triggerId " << impl_req->triggerId);
}
@@ -17765,14 +17941,14 @@ Dbdict::createTrigger_parse(Signal* sign
{
jam();
// slave receives trigger id from master
- if (! (impl_req->triggerId < c_triggerRecordPool.getSize()))
+ if (! (impl_req->triggerId < c_triggerRecordPool_.getSize()))
{
jam();
setError(error, CreateTrigRef::TooManyTriggers, __LINE__);
return;
}
- c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
- if (triggerPtr.p->triggerState != TriggerRecord::TS_NOT_DEFINED)
+ bool ok = find_object(triggerPtr, impl_req->triggerId);
+ if (ok)
{
jam();
setError(error, CreateTrigRef::TriggerExists, __LINE__);
@@ -17781,15 +17957,20 @@ Dbdict::createTrigger_parse(Signal* sign
D("slave allocated triggerId " << hex << impl_req->triggerId);
}
- initialiseTriggerRecord(triggerPtr);
-
- triggerPtr.p->triggerId = impl_req->triggerId;
+ bool ok = seizeTriggerRecord(triggerPtr, impl_req->triggerId);
+ if (!ok)
+ {
+ jam();
+ setError(error, CreateTrigRef::TooManyTriggers, __LINE__);
+ return;
+ }
triggerPtr.p->tableId = impl_req->tableId;
triggerPtr.p->indexId = RNIL; // feedback method connects to index
triggerPtr.p->triggerInfo = impl_req->triggerInfo;
triggerPtr.p->receiverRef = impl_req->receiverRef;
triggerPtr.p->triggerState = TriggerRecord::TS_DEFINING;
+ // TODO:msundell on failure below, leak of TriggerRecord
if (handle.m_cnt >= 2)
{
jam();
@@ -17829,12 +18010,13 @@ Dbdict::createTrigger_parse(Signal* sign
// connect to new DictObject
{
DictObjectPtr obj_ptr;
- seizeDictObject(op_ptr, obj_ptr, triggerPtr.p->triggerName);
+ seizeDictObject(op_ptr, obj_ptr, triggerPtr.p->triggerName); // added to c_obj_name_hash
obj_ptr.p->m_id = impl_req->triggerId; // wl3600_todo id
obj_ptr.p->m_type =
TriggerInfo::getTriggerType(triggerPtr.p->triggerInfo);
- triggerPtr.p->m_obj_ptr_i = obj_ptr.i;
+ link_object(obj_ptr, triggerPtr);
+ c_obj_id_hash.add(obj_ptr);
}
{
@@ -17875,7 +18057,8 @@ Dbdict::createTrigger_parse(Signal* sign
if (impl_req->indexId != RNIL)
{
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+ bool ok = find_object(indexPtr, impl_req->indexId);
+ ndbrequire(ok);
triggerPtr.p->indexId = impl_req->indexId;
indexPtr.p->triggerId = impl_req->triggerId;
}
@@ -17911,7 +18094,12 @@ Dbdict::createTrigger_parse_endpoint(Sig
}
TriggerRecordPtr triggerPtr;
- c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
+ bool ok = find_object(triggerPtr, impl_req->triggerId);
+ if (!ok)
+ {
+ jam();
+ return;
+ }
switch(TriggerInfo::getTriggerType(triggerPtr.p->triggerInfo)){
case TriggerType::REORG_TRIGGER:
jam();
@@ -18226,8 +18414,8 @@ Dbdict::createTrigger_commit(Signal* sig
Uint32 triggerId = impl_req->triggerId;
TriggerRecordPtr triggerPtr;
- c_triggerRecordPool.getPtr(triggerPtr, triggerId);
-
+ bool ok = find_object(triggerPtr, triggerId);
+ ndbrequire(ok);
triggerPtr.p->triggerState = TriggerRecord::TS_ONLINE;
unlinkDictObject(op_ptr);
}
@@ -18285,26 +18473,30 @@ Dbdict::createTrigger_abortParse(Signal*
jam();
TriggerRecordPtr triggerPtr;
- if (! (triggerId < c_triggerRecordPool.getSize()))
+ if (! (triggerId < c_triggerRecordPool_.getSize()))
{
jam();
goto done;
}
- c_triggerRecordPool.getPtr(triggerPtr, triggerId);
-
- if (triggerPtr.p->triggerState == TriggerRecord::TS_DEFINING)
+ bool ok = find_object(triggerPtr, triggerId);
+ if (ok)
{
jam();
- triggerPtr.p->triggerState = TriggerRecord::TS_NOT_DEFINED;
- }
- if (triggerPtr.p->indexId != RNIL)
- {
- TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, triggerPtr.p->indexId);
- triggerPtr.p->indexId = RNIL;
- indexPtr.p->triggerId = RNIL;
+ if (triggerPtr.p->indexId != RNIL)
+ {
+ TableRecordPtr indexPtr;
+ bool ok = find_object(indexPtr, triggerPtr.p->indexId);
+ if (ok)
+ {
+ jam();
+ indexPtr.p->triggerId = RNIL;
+ }
+ triggerPtr.p->indexId = RNIL;
+ }
+
+ c_triggerRecordPool_.release(triggerPtr);
}
// ignore Feedback for now (referencing object will be dropped too)
@@ -18390,8 +18582,8 @@ Dbdict::send_create_trig_req(Signal* sig
const CreateTrigImplReq* impl_req = &createTriggerPtr.p->m_request;
TriggerRecordPtr triggerPtr;
- c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
-
+ bool ok = find_object(triggerPtr, impl_req->triggerId);
+ ndbrequire(ok);
D("send_create_trig_req");
CreateTrigImplReq* req = (CreateTrigImplReq*)signal->getDataPtrSend();
@@ -18421,7 +18613,8 @@ Dbdict::send_create_trig_req(Signal* sig
if (triggerPtr.p->indexId != RNIL)
{
jam();
- c_tableRecordPool.getPtr(indexPtr, triggerPtr.p->indexId);
+ bool ok = find_object(indexPtr, triggerPtr.p->indexId);
+ ndbrequire(ok);
if (indexPtr.p->m_upgrade_trigger_handling.m_upgrade)
{
jam();
@@ -18636,12 +18829,18 @@ Dbdict::dropTrigger_parse(Signal* signal
// check trigger id from user or via name
TriggerRecordPtr triggerPtr;
{
- if (!(impl_req->triggerId < c_triggerRecordPool.getSize())) {
+ if (!(impl_req->triggerId < c_triggerRecordPool_.getSize())) {
+ jam();
+ setError(error, DropTrigImplRef::TriggerNotFound, __LINE__);
+ return;
+ }
+ bool ok = find_object(triggerPtr, impl_req->triggerId);
+ if (!ok)
+ {
jam();
setError(error, DropTrigImplRef::TriggerNotFound, __LINE__);
return;
}
- c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
// wl3600_todo state check
}
@@ -18952,19 +19151,24 @@ Dbdict::dropTrigger_commit(Signal* signa
Uint32 triggerId = dropTriggerPtr.p->m_request.triggerId;
TriggerRecordPtr triggerPtr;
- c_triggerRecordPool.getPtr(triggerPtr, triggerId);
-
+ bool ok = find_object(triggerPtr, triggerId);
+ ndbrequire(ok);
if (triggerPtr.p->indexId != RNIL)
{
+ jam();
TableRecordPtr indexPtr;
- c_tableRecordPool.getPtr(indexPtr, triggerPtr.p->indexId);
+ bool ok = find_object(indexPtr, triggerPtr.p->indexId);
+ if (ok)
+ {
+ jam();
+ indexPtr.p->triggerId = RNIL;
+ }
triggerPtr.p->indexId = RNIL;
- indexPtr.p->triggerId = RNIL;
}
// remove trigger
+ c_triggerRecordPool_.release(triggerPtr);
releaseDictObject(op_ptr);
- triggerPtr.p->triggerState = TriggerRecord::TS_NOT_DEFINED;
sendTransConf(signal, op_ptr);
return;
@@ -19139,7 +19343,8 @@ Dbdict::getIndexAttr(TableRecordPtr inde
TableRecordPtr tablePtr;
AttributeRecordPtr attrPtr;
- c_tableRecordPool.getPtr(tablePtr, indexPtr.p->primaryTableId);
+ bool ok = find_object(tablePtr, indexPtr.p->primaryTableId);
+ ndbrequire(ok);
AttributeRecord* iaRec = c_attributeRecordPool.getPtr(itAttr);
{
ConstRope tmp(c_rope_pool, iaRec->attributeName);
@@ -20403,10 +20608,14 @@ Dbdict::execBACKUP_LOCK_TAB_REQ(Signal*
Uint32 lock = req->m_lock_unlock;
TableRecordPtr tablePtr;
- c_tableRecordPool.getPtr(tablePtr, tableId, true);
-
+ bool ok = find_object(tablePtr, tableId);
Uint32 err = 0;
- if(lock == BackupLockTab::LOCK_TABLE)
+ if (!ok)
+ {
+ jam();
+ err = GetTabInfoRef::InvalidTableId;
+ }
+ else if(lock == BackupLockTab::LOCK_TABLE)
{
jam();
if ((err = check_write_obj(tableId)) == 0)
@@ -29005,7 +29214,7 @@ Dbdict::check_consistency()
tablePtr.i++) {
if (check_read_obj(tablePtr.i,
- c_tableRecordPool.getPtr(tablePtr);
+ c_tableRecordPool_.getPtr(tablePtr);
switch (tablePtr.p->tabState) {
case TableRecord::NOT_DEFINED:
@@ -29019,10 +29228,11 @@ Dbdict::check_consistency()
// triggers // should be in schema file
TriggerRecordPtr triggerPtr;
- for (triggerPtr.i = 0;
- triggerPtr.i < c_triggerRecordPool.getSize();
- triggerPtr.i++) {
- c_triggerRecordPool.getPtr(triggerPtr);
+ for (Uint32 id = 0;
+ id < c_triggerRecordPool_.getSize();
+ id++) {
+ bool ok = find_object(triggerPtr, id);
+ if (!ok) continue;
switch (triggerPtr.p->triggerState) {
case TriggerRecord::TS_NOT_DEFINED:
continue;
@@ -29069,7 +29279,6 @@ void
Dbdict::check_consistency_table(TableRecordPtr tablePtr)
{
D("table " << copyRope<SZ>(tablePtr.p->tableName));
- ndbrequire(tablePtr.p->tableId == tablePtr.i);
switch (tablePtr.p->tableType) {
case DictTabInfo::SystemTable: // should just be "Table"
@@ -29111,9 +29320,8 @@ Dbdict::check_consistency_index(TableRec
}
TableRecordPtr tablePtr;
- tablePtr.i = indexPtr.p->primaryTableId;
- ndbrequire(tablePtr.i != RNIL);
- c_tableRecordPool.getPtr(tablePtr);
+ bool ok = find_object(tablePtr, indexPtr.p->primaryTableId);
+ ndbrequire(ok);
check_consistency_table(tablePtr);
bool is_unique_index = false;
@@ -29131,13 +29339,10 @@ Dbdict::check_consistency_index(TableRec
}
TriggerRecordPtr triggerPtr;
- triggerPtr.i = indexPtr.p->triggerId;
- ndbrequire(triggerPtr.i != RNIL);
- c_triggerRecordPool.getPtr(triggerPtr);
-
+ ok = find_object(triggerPtr, indexPtr.p->triggerId);
+ ndbrequire(ok);
ndbrequire(triggerPtr.p->tableId == tablePtr.p->tableId);
ndbrequire(triggerPtr.p->indexId == indexPtr.p->tableId);
- ndbrequire(triggerPtr.p->triggerId == triggerPtr.i);
check_consistency_trigger(triggerPtr);
@@ -29163,21 +29368,19 @@ Dbdict::check_consistency_trigger(Trigge
{
ndbrequire(triggerPtr.p->triggerState == TriggerRecord::TS_ONLINE);
}
- ndbrequire(triggerPtr.p->triggerId == triggerPtr.i);
TableRecordPtr tablePtr;
- tablePtr.i = triggerPtr.p->tableId;
- ndbrequire(tablePtr.i != RNIL);
- c_tableRecordPool.getPtr(tablePtr);
+ bool ok = find_object(tablePtr, triggerPtr.p->tableId);
+ ndbrequire(ok);
check_consistency_table(tablePtr);
if (triggerPtr.p->indexId != RNIL)
{
jam();
TableRecordPtr indexPtr;
- indexPtr.i = triggerPtr.p->indexId;
- c_tableRecordPool.getPtr(indexPtr);
- ndbrequire(check_read_obj(indexPtr.i) == 0);
+ ndbrequire(check_read_obj(triggerPtr.p->indexId) == 0);
+ bool ok = find_object(indexPtr, triggerPtr.p->indexId);
+ ndbrequire(ok);
ndbrequire(indexPtr.p->indexState == TableRecord::IS_ONLINE);
TriggerInfo ti;
TriggerInfo::unpackTriggerInfo(triggerPtr.p->triggerInfo, ti);
@@ -29185,7 +29388,7 @@ Dbdict::check_consistency_trigger(Trigge
case TriggerEvent::TE_CUSTOM:
if (! (triggerPtr.p->triggerState == TriggerRecord::TS_FAKE_UPGRADE))
{
- ndbrequire(triggerPtr.i == indexPtr.p->triggerId);
+ ndbrequire(triggerPtr.p->triggerId == indexPtr.p->triggerId);
}
break;
default:
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2011-11-11 13:31:19 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2011-11-19 16:53:16 +0000
@@ -437,9 +437,9 @@ public:
Uint32 indexStatBgRequest;
};
- TableRecord_pool c_tableRecordPool;
- RSS_AP_SNAPSHOT(c_tableRecordPool);
- TableRecord_pool& get_pool(TableRecordPtr) { return c_tableRecordPool; }
+ TableRecord_pool c_tableRecordPool_;
+ RSS_AP_SNAPSHOT(c_tableRecordPool_);
+ TableRecord_pool& get_pool(TableRecordPtr) { return c_tableRecordPool_; }
/** Node Group and Tablespace id+version + range or list data.
* This is only stored temporarily in DBDICT during an ongoing
@@ -459,6 +459,7 @@ public:
*/
struct TriggerRecord {
TriggerRecord() {}
+ static bool isCompatible(Uint32 type) { return DictTabInfo::isTrigger(type); }
/** Trigger state */
enum TriggerState {
@@ -505,8 +506,9 @@ public:
typedef ArrayPool<TriggerRecord> TriggerRecord_pool;
Uint32 c_maxNoOfTriggers;
- TriggerRecord_pool c_triggerRecordPool;
- RSS_AP_SNAPSHOT(c_triggerRecordPool);
+ TriggerRecord_pool c_triggerRecordPool_;
+ TriggerRecord_pool& get_pool(TriggerRecordPtr) { return c_triggerRecordPool_;}
+ RSS_AP_SNAPSHOT(c_triggerRecordPool_);
/**
* Information for each FS connection.
@@ -771,6 +773,17 @@ public:
return find_object(obj, object, id);
}
+ bool find_object(DictObjectPtr& obj, Ptr<TriggerRecord>& object, Uint32 id)
+ {
+ if (!find_trigger_object(obj, id))
+ {
+ object.setNull();
+ return false;
+ }
+ get_pool(object).getPtr(object, obj.p->m_object_ptr_i);
+ return !object.isNull();
+ }
+
bool find_object(DictObjectPtr& object, Uint32 id)
{
DictObject key;
@@ -780,6 +793,15 @@ public:
return ok;
}
+ bool find_trigger_object(DictObjectPtr& object, Uint32 id)
+ {
+ DictObject key;
+ key.m_id = id;
+ key.m_type = DictTabInfo::HashIndexTrigger; // A trigger type
+ bool ok = c_obj_id_hash.find(object, key);
+ return ok;
+ }
+
template<typename T> bool link_object(DictObjectPtr obj, Ptr<T> object)
{
if (!T::isCompatible(obj.p->m_type))
@@ -3728,14 +3750,17 @@ private:
/* ------------------------------------------------------------ */
// Drop Table Handling
/* ------------------------------------------------------------ */
- void releaseTableObject(Uint32 tableId, bool removeFromHash = true);
+ void releaseTableObject(Uint32 table_ptr_i, bool removeFromHash = true);
/* ------------------------------------------------------------ */
// General Stuff
/* ------------------------------------------------------------ */
Uint32 getFreeObjId(bool both = false);
Uint32 getFreeTableRecord();
+ bool seizeTableRecord(TableRecordPtr& tableRecord, Uint32& schemaFileId);
Uint32 getFreeTriggerRecord();
+ bool seizeTriggerRecord(TriggerRecordPtr& tableRecord, Uint32 triggerId);
+ void releaseTriggerObject(Uint32 trigger_ptr_i);
bool getNewAttributeRecord(TableRecordPtr tablePtr,
AttributeRecordPtr & attrPtr);
void packTableIntoPages(Signal* signal);
@@ -3988,10 +4013,8 @@ private:
void initWriteSchemaRecord();
void initNodeRecords();
- void initTableRecords();
- void initialiseTableRecord(TableRecordPtr tablePtr);
- void initTriggerRecords();
- void initialiseTriggerRecord(TriggerRecordPtr triggerPtr);
+ void initialiseTableRecord(TableRecordPtr tablePtr, Uint32 tableId);
+ void initialiseTriggerRecord(TriggerRecordPtr triggerPtr, Uint32 triggerId);
void initPageRecords();
Uint32 getFsConnRecord();
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-11-16 09:29:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-11-21 12:53:01 +0000
@@ -7486,7 +7486,8 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ
const Uint32 defaultFragments =
c_fragments_per_node * cnoOfNodeGroups * cnoReplicas;
const Uint32 maxFragments =
- MAX_FRAG_PER_LQH * getLqhWorkers() * cnoOfNodeGroups * cnoReplicas;
+ MAX_FRAG_PER_LQH * (getLqhWorkers() ? getLqhWorkers() : 1) *
+ cnoOfNodeGroups * cnoReplicas;
do {
NodeGroupRecordPtr NGPtr;
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2011-11-16 05:47:02 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2011-11-18 06:47:23 +0000
@@ -36,7 +36,9 @@ void Dblqh::initData()
clcpFileSize = ZNO_CONCURRENT_LCP;
clfoFileSize = 0;
clogFileFileSize = 0;
- clogPartFileSize = 0; // Not valid until READ_CONFIG
+
+ NdbLogPartInfo lpinfo(instance());
+ clogPartFileSize = lpinfo.partCount;
cpageRefFileSize = ZPAGE_REF_FILE_SIZE;
cscanrecFileSize = 0;
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-11-16 09:29:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-11-21 12:53:01 +0000
@@ -1220,36 +1220,31 @@ void Dblqh::execREAD_CONFIG_REQ(Signal*
m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0);
- clogPartFileSize = 4;
- Uint32 nodeLogParts = 4;
- ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_PARTS,
- &nodeLogParts);
- globalData.ndbLogParts = nodeLogParts;
- ndbrequire(nodeLogParts <= NDB_MAX_LOG_PARTS);
- {
- NdbLogPartInfo lpinfo(instance());
- clogPartFileSize = lpinfo.partCount; // How many are this instance responsible for...
- }
-
- if (globalData.ndbMtLqhWorkers > nodeLogParts)
+ /**
+ * TODO move check of log-parts vs. ndbMtLqhWorkers to better place
+ * (Configuration.cpp ??)
+ */
+ ndbrequire(globalData.ndbLogParts <= NDB_MAX_LOG_PARTS);
+ if (globalData.ndbMtLqhWorkers > globalData.ndbLogParts)
{
char buf[255];
BaseString::snprintf(buf, sizeof(buf),
"Trying to start %d LQH workers with only %d log parts, try initial"
" node restart to be able to use more LQH workers.",
- globalData.ndbMtLqhWorkers, nodeLogParts);
+ globalData.ndbMtLqhWorkers, globalData.ndbLogParts);
progError(__LINE__, NDBD_EXIT_INVALID_CONFIG, buf);
}
- if (nodeLogParts != 4 &&
- nodeLogParts != 8 &&
- nodeLogParts != 16)
+
+ if (globalData.ndbLogParts != 4 &&
+ globalData.ndbLogParts != 8 &&
+ globalData.ndbLogParts != 16)
{
char buf[255];
BaseString::snprintf(buf, sizeof(buf),
"Trying to start with %d log parts, number of log parts can"
" only be set to 4, 8 or 16.",
- nodeLogParts);
+ globalData.ndbLogParts);
progError(__LINE__, NDBD_EXIT_INVALID_CONFIG, buf);
}
@@ -1280,7 +1275,7 @@ void Dblqh::execREAD_CONFIG_REQ(Signal*
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TABLE, &ctabrecFileSize));
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TC_CONNECT,
&ctcConnectrecFileSize));
- clogFileFileSize = 4 * cnoLogFiles;
+ clogFileFileSize = clogPartFileSize * cnoLogFiles;
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_SCAN, &cscanrecFileSize));
cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_OP_PER_SCAN;
@@ -15926,6 +15921,7 @@ void Dblqh::initLogpage(Signal* signal)
logPagePtr.p->logPageWord[ZPOS_VERSION] = NDB_VERSION;
logPagePtr.p->logPageWord[ZPOS_NO_LOG_FILES] = logPartPtr.p->noLogFiles;
logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = ZPAGE_HEADER_SIZE;
+ logPagePtr.p->logPageWord[ZPOS_NO_LOG_PARTS]= globalData.ndbLogParts;
ilpTcConnectptr.i = logPartPtr.p->firstLogTcrec;
if (ilpTcConnectptr.i != RNIL) {
jam();
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2011-11-16 08:17:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2011-11-18 06:47:23 +0000
@@ -1769,6 +1769,7 @@ private:
Uint64 cabortCount;
Uint64 c_scan_count;
Uint64 c_range_scan_count;
+ Uint64 clocalReadCount;
// Resource usage counter(not monotonic)
Uint32 cconcurrentOp;
@@ -1783,6 +1784,7 @@ private:
cabortCount(0),
c_scan_count(0),
c_range_scan_count(0),
+ clocalReadCount(0),
cconcurrentOp(0) {}
Uint32 build_event_rep(Signal* signal)
@@ -1800,6 +1802,7 @@ private:
const Uint32 abortCount = diff(signal, 13, cabortCount);
const Uint32 scan_count = diff(signal, 15, c_scan_count);
const Uint32 range_scan_count = diff(signal, 17, c_range_scan_count);
+ const Uint32 localread_count = diff(signal, 19, clocalReadCount);
signal->theData[0] = NDB_LE_TransReportCounters;
signal->theData[1] = transCount;
@@ -1812,7 +1815,8 @@ private:
signal->theData[8] = abortCount;
signal->theData[9] = scan_count;
signal->theData[10] = range_scan_count;
- return 11;
+ signal->theData[11] = localread_count;
+ return 12;
}
Uint32 build_continueB(Signal* signal) const
@@ -1821,7 +1825,9 @@ private:
const Uint64* vars[] = {
&cattrinfoCount, &ctransCount, &ccommitCount,
&creadCount, &csimpleReadCount, &cwriteCount,
- &cabortCount, &c_scan_count, &c_range_scan_count };
+ &cabortCount, &c_scan_count, &c_range_scan_count,
+ &clocalReadCount
+ };
const size_t num = sizeof(vars)/sizeof(vars[0]);
for (size_t i = 0; i < num; i++)
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2011-11-16 08:17:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2011-11-18 06:47:23 +0000
@@ -3356,7 +3356,10 @@ void Dbtc::tckeyreq050Lab(Signal* signal
jam();
regTcPtr->lastReplicaNo = 0;
regTcPtr->noOfNodes = 1;
- }
+
+ if (regTcPtr->tcNodedata[0] == getOwnNodeId())
+ c_counters.clocalReadCount++;
+ }
else if (Toperation == ZUNLOCK)
{
regTcPtr->m_special_op_flags &= ~TcConnectRecord::SOF_REORG_MOVING;
@@ -13260,7 +13263,8 @@ void Dbtc::execDBINFO_SCANREQ(Signal *si
{ Ndbinfo::WRITES_COUNTER, c_counters.cwriteCount },
{ Ndbinfo::ABORTS_COUNTER, c_counters.cabortCount },
{ Ndbinfo::TABLE_SCANS_COUNTER, c_counters.c_scan_count },
- { Ndbinfo::RANGE_SCANS_COUNTER, c_counters.c_range_scan_count }
+ { Ndbinfo::RANGE_SCANS_COUNTER, c_counters.c_range_scan_count },
+ { Ndbinfo::LOCAL_READ_COUNTER, c_counters.clocalReadCount }
};
const size_t num_counters = sizeof(counters) / sizeof(counters[0]);
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2011-11-18 06:47:23 +0000
@@ -275,6 +275,22 @@ Ndbfs::execREAD_CONFIG_REQ(Signal* signa
Uint32 noIdleFiles = 27;
ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
+
+ {
+ /**
+ * each logpart keeps up to 3 logfiles open at any given time...
+ * (bound)
+ * make sure noIdleFiles is atleast 4 times #logparts
+ */
+ Uint32 logParts = NDB_DEFAULT_LOG_PARTS;
+ ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_PARTS, &logParts);
+ Uint32 logfiles = 4 * logParts;
+ if (noIdleFiles < logfiles)
+ {
+ noIdleFiles = logfiles;
+ }
+ }
+
// Make sure at least "noIdleFiles" files can be created
if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
m_maxFiles = noIdleFiles;
=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.cpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.cpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp 2011-11-19 16:53:16 +0000
@@ -2462,7 +2462,7 @@ Trix::statCleanExecute(Signal* signal, S
ndbrequire(data.m_indexVersion == av[1]);
data.m_sampleVersion = av[2];
data.m_statKey = &av[3];
- const char* kp = (const char*)data.m_statKey;
+ const unsigned char* kp = (const unsigned char*)data.m_statKey;
const Uint32 kb = kp[0] + (kp[1] << 8);
// key is not empty
ndbrequire(kb != 0);
@@ -2633,8 +2633,8 @@ Trix::statScanExecute(Signal* signal, St
::copy(av, ptr1);
data.m_statKey = &av[0];
data.m_statValue = &av[kz];
- const char* kp = (const char*)data.m_statKey;
- const char* vp = (const char*)data.m_statValue;
+ const unsigned char* kp = (const unsigned char*)data.m_statKey;
+ const unsigned char* vp = (const unsigned char*)data.m_statValue;
const Uint32 kb = kp[0] + (kp[1] << 8);
const Uint32 vb = vp[0] + (vp[1] << 8);
// key and value are not empty
=== added file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp 2011-11-16 15:38:25 +0000
@@ -0,0 +1,648 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "trpman.hpp"
+#include <TransporterRegistry.hpp>
+#include <signaldata/CloseComReqConf.hpp>
+#include <signaldata/DisconnectRep.hpp>
+#include <signaldata/EnableCom.hpp>
+#include <signaldata/RouteOrd.hpp>
+#include <signaldata/DumpStateOrd.hpp>
+
+Trpman::Trpman(Block_context & ctx, Uint32 instanceno) :
+ SimulatedBlock(TRPMAN, ctx, instanceno)
+{
+ BLOCK_CONSTRUCTOR(Trpman);
+
+ addRecSignal(GSN_CLOSE_COMREQ, &Trpman::execCLOSE_COMREQ);
+ addRecSignal(GSN_OPEN_COMREQ, &Trpman::execOPEN_COMREQ);
+ addRecSignal(GSN_ENABLE_COMREQ, &Trpman::execENABLE_COMREQ);
+ addRecSignal(GSN_DISCONNECT_REP, &Trpman::execDISCONNECT_REP);
+ addRecSignal(GSN_CONNECT_REP, &Trpman::execCONNECT_REP);
+ addRecSignal(GSN_ROUTE_ORD, &Trpman::execROUTE_ORD);
+
+ addRecSignal(GSN_NDB_TAMPER, &Trpman::execNDB_TAMPER, true);
+ addRecSignal(GSN_DUMP_STATE_ORD, &Trpman::execDUMP_STATE_ORD);
+ addRecSignal(GSN_DBINFO_SCANREQ, &Trpman::execDBINFO_SCANREQ);
+}
+
+Trpman::~Trpman()
+{
+}
+
+BLOCK_FUNCTIONS(Trpman)
+
+#ifdef ERROR_INSERT
+NodeBitmask c_error_9000_nodes_mask;
+extern Uint32 MAX_RECEIVED_SIGNALS;
+#endif
+
+void
+Trpman::execOPEN_COMREQ(Signal* signal)
+{
+ // Connect to the specifed NDB node, only QMGR allowed communication
+ // so far with the node
+
+ const BlockReference userRef = signal->theData[0];
+ Uint32 tStartingNode = signal->theData[1];
+ Uint32 tData2 = signal->theData[2];
+ jamEntry();
+
+ const Uint32 len = signal->getLength();
+ if (len == 2)
+ {
+#ifdef ERROR_INSERT
+ if (! ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002))
+ && c_error_9000_nodes_mask.get(tStartingNode)))
+#endif
+ {
+ globalTransporterRegistry.do_connect(tStartingNode);
+ globalTransporterRegistry.setIOState(tStartingNode, HaltIO);
+
+ //-----------------------------------------------------
+ // Report that the connection to the node is opened
+ //-----------------------------------------------------
+ signal->theData[0] = NDB_LE_CommunicationOpened;
+ signal->theData[1] = tStartingNode;
+ sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+ //-----------------------------------------------------
+ }
+ }
+ else
+ {
+ for(unsigned int i = 1; i < MAX_NODES; i++ )
+ {
+ jam();
+ if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2)
+ {
+ jam();
+
+#ifdef ERROR_INSERT
+ if ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002))
+ && c_error_9000_nodes_mask.get(i))
+ continue;
+#endif
+ globalTransporterRegistry.do_connect(i);
+ globalTransporterRegistry.setIOState(i, HaltIO);
+
+ signal->theData[0] = NDB_LE_CommunicationOpened;
+ signal->theData[1] = i;
+ sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+ }
+ }
+ }
+
+ if (userRef != 0)
+ {
+ jam();
+ signal->theData[0] = tStartingNode;
+ signal->theData[1] = tData2;
+ sendSignal(userRef, GSN_OPEN_COMCONF, signal, len - 1,JBA);
+ }
+}
+
+void
+Trpman::execCONNECT_REP(Signal *signal)
+{
+ const Uint32 hostId = signal->theData[0];
+ jamEntry();
+
+ const NodeInfo::NodeType type = (NodeInfo::NodeType)getNodeInfo(hostId).m_type;
+ ndbrequire(type != NodeInfo::INVALID);
+ globalData.m_nodeInfo[hostId].m_version = 0;
+ globalData.m_nodeInfo[hostId].m_mysql_version = 0;
+
+ /**
+ * Inform QMGR that client has connected
+ */
+ signal->theData[0] = hostId;
+ if (ERROR_INSERTED(9005))
+ {
+ sendSignalWithDelay(QMGR_REF, GSN_CONNECT_REP, signal, 50, 1);
+ }
+ else
+ {
+ sendSignal(QMGR_REF, GSN_CONNECT_REP, signal, 1, JBA);
+ }
+
+ /* Automatically subscribe events for MGM nodes.
+ */
+ if (type == NodeInfo::MGM)
+ {
+ jam();
+ globalTransporterRegistry.setIOState(hostId, NoHalt);
+ }
+
+ //------------------------------------------
+ // Also report this event to the Event handler
+ //------------------------------------------
+ signal->theData[0] = NDB_LE_Connected;
+ signal->theData[1] = hostId;
+ sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+}
+
+void
+Trpman::execCLOSE_COMREQ(Signal* signal)
+{
+ // Close communication with the node and halt input/output from
+ // other blocks than QMGR
+
+ CloseComReqConf * const closeCom = (CloseComReqConf *)&signal->theData[0];
+
+ const BlockReference userRef = closeCom->xxxBlockRef;
+ Uint32 requestType = closeCom->requestType;
+ Uint32 failNo = closeCom->failNo;
+// Uint32 noOfNodes = closeCom->noOfNodes;
+
+ jamEntry();
+ for (unsigned i = 0; i < MAX_NODES; i++)
+ {
+ if (NodeBitmask::get(closeCom->theNodes, i))
+ {
+ jam();
+
+ //-----------------------------------------------------
+ // Report that the connection to the node is closed
+ //-----------------------------------------------------
+ signal->theData[0] = NDB_LE_CommunicationClosed;
+ signal->theData[1] = i;
+ sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+
+ globalTransporterRegistry.setIOState(i, HaltIO);
+ globalTransporterRegistry.do_disconnect(i);
+ }
+ }
+
+ if (requestType != CloseComReqConf::RT_NO_REPLY)
+ {
+ ndbassert((requestType == CloseComReqConf::RT_API_FAILURE) ||
+ ((requestType == CloseComReqConf::RT_NODE_FAILURE) &&
+ (failNo != 0)));
+ jam();
+ CloseComReqConf* closeComConf = (CloseComReqConf *)signal->getDataPtrSend();
+ closeComConf->xxxBlockRef = userRef;
+ closeComConf->requestType = requestType;
+ closeComConf->failNo = failNo;
+
+ /* Note assumption that noOfNodes and theNodes
+ * bitmap is not trampled above
+ * signals received from the remote node.
+ */
+ sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
+ }
+}
+
+void
+Trpman::execENABLE_COMREQ(Signal* signal)
+{
+ jamEntry();
+ const EnableComReq *enableComReq = (const EnableComReq *)signal->getDataPtr();
+
+ /* Need to copy out signal data to not clobber it with sendSignal(). */
+ Uint32 senderRef = enableComReq->m_senderRef;
+ Uint32 senderData = enableComReq->m_senderData;
+ Uint32 nodes[NodeBitmask::Size];
+ MEMCOPY_NO_WORDS(nodes, enableComReq->m_nodeIds, NodeBitmask::Size);
+
+ /* Enable communication with all our NDB blocks to these nodes. */
+ Uint32 search_from = 0;
+ for (;;)
+ {
+ Uint32 tStartingNode = NodeBitmask::find(nodes, search_from);
+ if (tStartingNode == NodeBitmask::NotFound)
+ break;
+ search_from = tStartingNode + 1;
+
+ globalTransporterRegistry.setIOState(tStartingNode, NoHalt);
+ setNodeInfo(tStartingNode).m_connected = true;
+
+ //-----------------------------------------------------
+ // Report that the version of the node
+ //-----------------------------------------------------
+ signal->theData[0] = NDB_LE_ConnectedApiVersion;
+ signal->theData[1] = tStartingNode;
+ signal->theData[2] = getNodeInfo(tStartingNode).m_version;
+ signal->theData[3] = getNodeInfo(tStartingNode).m_mysql_version;
+
+ sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB);
+ //-----------------------------------------------------
+ }
+
+ EnableComConf *enableComConf = (EnableComConf *)signal->getDataPtrSend();
+ enableComConf->m_senderRef = reference();
+ enableComConf->m_senderData = senderData;
+ MEMCOPY_NO_WORDS(enableComConf->m_nodeIds, nodes, NodeBitmask::Size);
+ sendSignal(senderRef, GSN_ENABLE_COMCONF, signal,
+ EnableComConf::SignalLength, JBA);
+}
+
+void
+Trpman::execDISCONNECT_REP(Signal *signal)
+{
+ const DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0];
+ const Uint32 hostId = rep->nodeId;
+ jamEntry();
+
+ setNodeInfo(hostId).m_connected = false;
+ setNodeInfo(hostId).m_connectCount++;
+ const NodeInfo::NodeType type = getNodeInfo(hostId).getType();
+ ndbrequire(type != NodeInfo::INVALID);
+
+ sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal,
+ DisconnectRep::SignalLength, JBA);
+
+ signal->theData[0] = hostId;
+ sendSignal(CMVMI_REF, GSN_CANCEL_SUBSCRIPTION_REQ, signal, 1, JBB);
+
+ signal->theData[0] = NDB_LE_Disconnected;
+ signal->theData[1] = hostId;
+ sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+}
+
+/**
+ * execROUTE_ORD
+ * Allows other blocks to route signals as if they
+ * came from TRPMAN
+ * Useful in ndbmtd for synchronising signals w.r.t
+ * external signals received from other nodes which
+ * arrive from the same thread that runs TRPMAN
+ */
+void
+Trpman::execROUTE_ORD(Signal* signal)
+{
+ jamEntry();
+ if (!assembleFragments(signal))
+ {
+ jam();
+ return;
+ }
+
+ SectionHandle handle(this, signal);
+
+ RouteOrd* ord = (RouteOrd*)signal->getDataPtr();
+ Uint32 dstRef = ord->dstRef;
+ Uint32 srcRef = ord->srcRef;
+ Uint32 gsn = ord->gsn;
+ /* ord->cnt ignored */
+
+ Uint32 nodeId = refToNode(dstRef);
+
+ if (likely((nodeId == 0) ||
+ getNodeInfo(nodeId).m_connected))
+ {
+ jam();
+ Uint32 secCount = handle.m_cnt;
+ ndbrequire(secCount >= 1 && secCount <= 3);
+
+ jamLine(secCount);
+
+ /**
+ * Put section 0 in signal->theData
+ */
+ Uint32 sigLen = handle.m_ptr[0].sz;
+ ndbrequire(sigLen <= 25);
+ copy(signal->theData, handle.m_ptr[0]);
+
+ SegmentedSectionPtr save = handle.m_ptr[0];
+ for (Uint32 i = 0; i < secCount - 1; i++)
+ handle.m_ptr[i] = handle.m_ptr[i+1];
+ handle.m_cnt--;
+
+ sendSignal(dstRef, gsn, signal, sigLen, JBB, &handle);
+
+ handle.m_cnt = 1;
+ handle.m_ptr[0] = save;
+ releaseSections(handle);
+ return ;
+ }
+
+ releaseSections(handle);
+ warningEvent("Unable to route GSN: %d from %x to %x",
+ gsn, srcRef, dstRef);
+}
+
+void
+Trpman::execDBINFO_SCANREQ(Signal *signal)
+{
+ DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
+ const Ndbinfo::ScanCursor* cursor =
+ CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
+ Ndbinfo::Ratelimit rl;
+
+ jamEntry();
+
+ switch(req.tableId){
+ case Ndbinfo::TRANSPORTERS_TABLEID:
+ {
+ jam();
+ Uint32 rnode = cursor->data[0];
+ if (rnode == 0)
+ rnode++; // Skip node 0
+
+ while (rnode < MAX_NODES)
+ {
+ switch(getNodeInfo(rnode).m_type)
+ {
+ default:
+ {
+ jam();
+ Ndbinfo::Row row(signal, req);
+ row.write_uint32(getOwnNodeId()); // Node id
+ row.write_uint32(rnode); // Remote node id
+ row.write_uint32(globalTransporterRegistry.getPerformState(rnode)); // State
+ ndbinfo_send_row(signal, req, row, rl);
+ break;
+ }
+
+ case NodeInfo::INVALID:
+ jam();
+ break;
+ }
+
+ rnode++;
+ if (rl.need_break(req))
+ {
+ jam();
+ ndbinfo_send_scan_break(signal, req, rl, rnode);
+ return;
+ }
+ }
+ break;
+ }
+
+ default:
+ break;
+ }
+
+ ndbinfo_send_scan_conf(signal, req, rl);
+}
+
+void
+Trpman::execNDB_TAMPER(Signal* signal)
+{
+ jamEntry();
+#ifdef ERROR_INSERT
+ if (signal->theData[0] == 9003)
+ {
+ if (MAX_RECEIVED_SIGNALS < 1024)
+ {
+ MAX_RECEIVED_SIGNALS = 1024;
+ }
+ else
+ {
+ MAX_RECEIVED_SIGNALS = 1 + (rand() % 128);
+ }
+ ndbout_c("MAX_RECEIVED_SIGNALS: %d", MAX_RECEIVED_SIGNALS);
+ CLEAR_ERROR_INSERT_VALUE;
+ }
+#endif
+}//execNDB_TAMPER()
+
+void
+Trpman::execDUMP_STATE_ORD(Signal* signal)
+{
+ DumpStateOrd * const & dumpState = (DumpStateOrd *)&signal->theData[0];
+ Uint32 arg = dumpState->args[0]; (void)arg;
+
+#ifdef ERROR_INSERT
+ if (arg == 9000 || arg == 9002)
+ {
+ SET_ERROR_INSERT_VALUE(arg);
+ for (Uint32 i = 1; i<signal->getLength(); i++)
+ c_error_9000_nodes_mask.set(signal->theData[i]);
+ }
+
+ if (arg == 9001)
+ {
+ CLEAR_ERROR_INSERT_VALUE;
+ if (signal->getLength() == 1 || signal->theData[1])
+ {
+ for (Uint32 i = 0; i<MAX_NODES; i++)
+ {
+ if (c_error_9000_nodes_mask.get(i))
+ {
+ signal->theData[0] = 0;
+ signal->theData[1] = i;
+ execOPEN_COMREQ(signal);
+ }
+ }
+ }
+ c_error_9000_nodes_mask.clear();
+ }
+
+ if (arg == 9004 && signal->getLength() == 2)
+ {
+ SET_ERROR_INSERT_VALUE(9004);
+ c_error_9000_nodes_mask.clear();
+ c_error_9000_nodes_mask.set(signal->theData[1]);
+ }
+
+ if (arg == 9005 && signal->getLength() == 2 && ERROR_INSERTED(9004))
+ {
+ Uint32 db = signal->theData[1];
+ Uint32 i = c_error_9000_nodes_mask.find(0);
+ signal->theData[0] = i;
+ sendSignal(calcQmgrBlockRef(db),GSN_API_FAILREQ, signal, 1, JBA);
+ ndbout_c("stopping %u using %u", i, db);
+ CLEAR_ERROR_INSERT_VALUE;
+ }
+#endif
+
+#ifdef ERROR_INSERT
+ /* <Target NodeId> dump 9992 <NodeId list>
+ * On Target NodeId, block receiving signals from NodeId list
+ *
+ * <Target NodeId> dump 9993 <NodeId list>
+ * On Target NodeId, resume receiving signals from NodeId list
+ *
+ * <Target NodeId> dump 9991
+ * On Target NodeId, resume receiving signals from any blocked node
+ *
+ *
+ * See also code in QMGR for blocking receive from nodes based
+ * on HB roles.
+ *
+ */
+ if((arg == 9993) || /* Unblock recv from nodeid */
+ (arg == 9992)) /* Block recv from nodeid */
+ {
+ bool block = (arg == 9992);
+ for (Uint32 n = 1; n < signal->getLength(); n++)
+ {
+ Uint32 nodeId = signal->theData[n];
+
+ if ((nodeId > 0) &&
+ (nodeId < MAX_NODES))
+ {
+ if (block)
+ {
+ ndbout_c("CMVMI : Blocking receive from node %u", nodeId);
+
+ globalTransporterRegistry.blockReceive(nodeId);
+ }
+ else
+ {
+ ndbout_c("CMVMI : Unblocking receive from node %u", nodeId);
+
+ globalTransporterRegistry.unblockReceive(nodeId);
+ }
+ }
+ else
+ {
+ ndbout_c("CMVMI : Ignoring dump %u for node %u",
+ arg, nodeId);
+ }
+ }
+ }
+ if (arg == 9990) /* Block recv from all ndbd matching pattern */
+ {
+ Uint32 pattern = 0;
+ if (signal->getLength() > 1)
+ {
+ pattern = signal->theData[1];
+ ndbout_c("CMVMI : Blocking receive from all ndbds matching pattern -%s-",
+ ((pattern == 1)? "Other side":"Unknown"));
+ }
+
+ for (Uint32 node = 1; node < MAX_NDB_NODES; node++)
+ {
+ if (globalTransporterRegistry.is_connected(node))
+ {
+ if (getNodeInfo(node).m_type == NodeInfo::DB)
+ {
+ if (!globalTransporterRegistry.isBlocked(node))
+ {
+ switch (pattern)
+ {
+ case 1:
+ {
+ /* Match if given node is on 'other side' of
+ * 2-replica cluster
+ */
+ if ((getOwnNodeId() & 1) != (node & 1))
+ {
+ /* Node is on the 'other side', match */
+ break;
+ }
+ /* Node is on 'my side', don't match */
+ continue;
+ }
+ default:
+ break;
+ }
+ ndbout_c("CMVMI : Blocking receive from node %u", node);
+ globalTransporterRegistry.blockReceive(node);
+ }
+ }
+ }
+ }
+ }
+ if (arg == 9991) /* Unblock recv from all blocked */
+ {
+ for (Uint32 node = 0; node < MAX_NODES; node++)
+ {
+ if (globalTransporterRegistry.isBlocked(node))
+ {
+ ndbout_c("CMVMI : Unblocking receive from node %u", node);
+ globalTransporterRegistry.unblockReceive(node);
+ }
+ }
+ }
+#endif
+}
+
+TrpmanProxy::TrpmanProxy(Block_context & ctx) :
+ LocalProxy(TRPMAN, ctx)
+{
+ addRecSignal(GSN_CLOSE_COMREQ, &TrpmanProxy::execCLOSE_COMREQ);
+ addRecSignal(GSN_OPEN_COMREQ, &TrpmanProxy::execOPEN_COMREQ);
+ addRecSignal(GSN_ENABLE_COMREQ, &TrpmanProxy::execENABLE_COMREQ);
+ addRecSignal(GSN_DISCONNECT_REP, &TrpmanProxy::execDISCONNECT_REP);
+ addRecSignal(GSN_CONNECT_REP, &TrpmanProxy::execCONNECT_REP);
+ addRecSignal(GSN_ROUTE_ORD, &TrpmanProxy::execROUTE_ORD);
+}
+
+TrpmanProxy::~TrpmanProxy()
+{
+}
+
+SimulatedBlock*
+TrpmanProxy::newWorker(Uint32 instanceNo)
+{
+ return new Trpman(m_ctx, instanceNo);
+}
+
+BLOCK_FUNCTIONS(TrpmanProxy);
+
+/**
+ * TODO TrpmanProxy need to have operation records
+ * to support splicing a request onto several Trpman-instances
+ * according to how receive-threads are assigned to instances
+ */
+void
+TrpmanProxy::execOPEN_COMREQ(Signal* signal)
+{
+ jamEntry();
+ SectionHandle handle(this, signal);
+ sendSignal(workerRef(0), GSN_OPEN_COMREQ, signal,
+ signal->getLength(), JBB, &handle);
+}
+
+void
+TrpmanProxy::execCONNECT_REP(Signal *signal)
+{
+ jamEntry();
+ SectionHandle handle(this, signal);
+ sendSignal(workerRef(0), GSN_CONNECT_REP, signal,
+ signal->getLength(), JBB, &handle);
+}
+
+void
+TrpmanProxy::execCLOSE_COMREQ(Signal* signal)
+{
+ jamEntry();
+ SectionHandle handle(this, signal);
+ sendSignal(workerRef(0), GSN_CLOSE_COMREQ, signal,
+ signal->getLength(), JBB, &handle);
+}
+
+void
+TrpmanProxy::execENABLE_COMREQ(Signal* signal)
+{
+ jamEntry();
+ SectionHandle handle(this, signal);
+ sendSignal(workerRef(0), GSN_ENABLE_COMREQ, signal,
+ signal->getLength(), JBB, &handle);
+}
+
+void
+TrpmanProxy::execDISCONNECT_REP(Signal *signal)
+{
+ jamEntry();
+ SectionHandle handle(this, signal);
+ sendSignal(workerRef(0), GSN_DISCONNECT_REP, signal,
+ signal->getLength(), JBB, &handle);
+}
+
+void
+TrpmanProxy::execROUTE_ORD(Signal* signal)
+{
+ jamEntry();
+ SectionHandle handle(this, signal);
+ sendSignal(workerRef(0), GSN_ROUTE_ORD, signal,
+ signal->getLength(), JBB, &handle);
+}
=== added file 'storage/ndb/src/kernel/blocks/trpman.hpp'
--- a/storage/ndb/src/kernel/blocks/trpman.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.hpp 2011-11-16 15:38:25 +0000
@@ -0,0 +1,66 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef TRPMAN_H
+#define TRPMAN_H
+
+#include <pc.hpp>
+#include <SimulatedBlock.hpp>
+#include <LocalProxy.hpp>
+
+class Trpman : public SimulatedBlock
+{
+public:
+ Trpman(Block_context& ctx, Uint32 instanceNumber = 0);
+ virtual ~Trpman();
+ BLOCK_DEFINES(Trpman);
+
+ void execCLOSE_COMREQ(Signal *signal);
+ void execOPEN_COMREQ(Signal *signal);
+ void execENABLE_COMREQ(Signal *signal);
+ void execDISCONNECT_REP(Signal *signal);
+ void execCONNECT_REP(Signal *signal);
+ void execROUTE_ORD(Signal* signal);
+
+ void execDBINFO_SCANREQ(Signal*);
+
+ void execNDB_TAMPER(Signal*);
+ void execDUMP_STATE_ORD(Signal*);
+protected:
+
+};
+
+class TrpmanProxy : public LocalProxy
+{
+public:
+ TrpmanProxy(Block_context& ctx);
+ virtual ~TrpmanProxy();
+ BLOCK_DEFINES(TrpmanProxy);
+
+ void execCLOSE_COMREQ(Signal *signal);
+ void execOPEN_COMREQ(Signal *signal);
+ void execENABLE_COMREQ(Signal *signal);
+ void execDISCONNECT_REP(Signal *signal);
+ void execCONNECT_REP(Signal *signal);
+ void execROUTE_ORD(Signal* signal);
+
+ void execNDB_TAMPER(Signal*);
+ void execDUMP_STATE_ORD(Signal*);
+protected:
+ virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+};
+#endif
=== modified file 'storage/ndb/src/kernel/ndbd.cpp'
--- a/storage/ndb/src/kernel/ndbd.cpp 2011-10-07 13:15:08 +0000
+++ b/storage/ndb/src/kernel/ndbd.cpp 2011-11-16 11:05:46 +0000
@@ -161,9 +161,13 @@ init_global_memory_manager(EmulatorData
ed.m_mem_manager->set_resource_limit(rl);
}
- Uint32 maxopen = 4 * 4; // 4 redo parts, max 4 files per part
+ Uint32 logParts = NDB_DEFAULT_LOG_PARTS;
+ ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_PARTS, &logParts);
+
+ Uint32 maxopen = logParts * 4; // 4 redo parts, max 4 files per part
Uint32 filebuffer = NDB_FILE_BUFFER_SIZE;
Uint32 filepages = (filebuffer / GLOBAL_PAGE_SIZE) * maxopen;
+ globalData.ndbLogParts = logParts;
{
/**
=== modified file 'storage/ndb/src/kernel/vm/Ndbinfo.hpp'
--- a/storage/ndb/src/kernel/vm/Ndbinfo.hpp 2011-10-20 19:52:11 +0000
+++ b/storage/ndb/src/kernel/vm/Ndbinfo.hpp 2011-11-18 06:47:23 +0000
@@ -194,7 +194,8 @@ public:
SPJ_SCAN_BATCHES_RETURNED_COUNTER = 20,
SPJ_SCAN_ROWS_RETURNED_COUNTER = 21,
SPJ_PRUNED_RANGE_SCANS_RECEIVED_COUNTER = 22,
- SPJ_CONST_PRUNED_RANGE_SCANS_RECEIVED_COUNTER = 23
+ SPJ_CONST_PRUNED_RANGE_SCANS_RECEIVED_COUNTER = 23,
+ LOCAL_READ_COUNTER = 24
};
struct counter_entry {
=== modified file 'storage/ndb/tools/ndbinfo_sql.cpp'
--- a/storage/ndb/tools/ndbinfo_sql.cpp 2011-11-16 08:17:17 +0000
+++ b/storage/ndb/tools/ndbinfo_sql.cpp 2011-11-18 06:47:23 +0000
@@ -134,6 +134,7 @@ struct view {
" WHEN 21 THEN \"SCAN_ROWS_RETURNED\""
" WHEN 22 THEN \"PRUNED_RANGE_SCANS_RECEIVED\""
" WHEN 23 THEN \"CONST_PRUNED_RANGE_SCANS_RECEIVED\""
+ " WHEN 24 THEN \"LOCAL_READS\""
" ELSE \"<unknown>\" "
" END AS counter_name, "
"val "
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-trunk-cluster branch (magnus.blaudd:3414 to 3415) | magnus.blaudd | 21 Nov |