#At file:///home/msvensson/mysql/6.4/
2700 Magnus Svensson 2008-08-08 [merge]
Merge
modified:
mysql-test/suite/ndb/r/ndb_multi_row.result
mysql-test/suite/ndb/r/ndb_trigger.result
mysql-test/suite/ndb/t/ndb_alter_table_backup.test
mysql-test/suite/ndb/t/ndb_trigger.test
mysql-test/suite/ndb_binlog/r/ndb_binlog_ddl_multi.result
mysql-test/suite/ndb_binlog/r/ndb_binlog_log_bin.result
sql/ha_ndbcluster.cc
sql/ha_ndbcluster_binlog.cc
storage/ndb/include/kernel/signaldata/LqhKey.hpp
storage/ndb/include/kernel/signaldata/NextScan.hpp
storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp
storage/ndb/include/kernel/signaldata/TupKey.hpp
storage/ndb/include/mgmapi/mgmapi.h
storage/ndb/include/ndb_version.h.in
storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp
storage/ndb/include/ndbapi/NdbOperation.hpp
storage/ndb/include/ndbapi/NdbTransaction.hpp
storage/ndb/src/common/debugger/signaldata/LqhKey.cpp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
storage/ndb/src/kernel/vm/LongSignal.hpp
storage/ndb/src/kernel/vm/SectionReader.cpp
storage/ndb/src/kernel/vm/SectionReader.hpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/ndbapi/NdbOperationExec.cpp
storage/ndb/src/ndbapi/NdbScanFilter.cpp
storage/ndb/src/ndbapi/NdbScanOperation.cpp
storage/ndb/src/ndbapi/NdbTransaction.cpp
storage/ndb/test/ndbapi/testDict.cpp
storage/ndb/test/ndbapi/testScanFilter.cpp
storage/ndb/test/ndbapi/test_event.cpp
storage/ndb/test/run-test/daily-basic-tests.txt
storage/ndb/tools/restore/Restore.cpp
=== modified file 'mysql-test/suite/ndb/r/ndb_multi_row.result'
--- a/mysql-test/suite/ndb/r/ndb_multi_row.result 2007-11-01 14:08:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_multi_row.result 2008-08-07 05:29:44 +0000
@@ -63,6 +63,6 @@ t4
drop table t1, t2, t3, t4;
drop table if exists t1, t3, t4;
Warnings:
-Error 155 Table 'test.t1' doesn't exist
-Error 155 Table 'test.t3' doesn't exist
-Error 155 Table 'test.t4' doesn't exist
+Note 1051 Unknown table 't1'
+Note 1051 Unknown table 't3'
+Note 1051 Unknown table 't4'
=== modified file 'mysql-test/suite/ndb/r/ndb_trigger.result'
--- a/mysql-test/suite/ndb/r/ndb_trigger.result 2007-07-04 20:38:53 +0000
+++ b/mysql-test/suite/ndb/r/ndb_trigger.result 2008-08-07 05:29:44 +0000
@@ -1,4 +1,7 @@
drop table if exists t1, t2, t3, t4, t5;
+flush status;
+drop table if exists t1, t2, t3, t4, t5;
+flush status;
create table t1 (id int primary key, a int not null, b decimal (63,30) default 0) engine=ndb;
create table t2 (op char(1), a int not null, b decimal (63,30)) engine=ndb;
create table t3 engine=ndb select 1 as i;
@@ -312,4 +315,29 @@ id xy
DROP TRIGGER t1_delete;
DROP TRIGGER t4_delete;
DROP TABLE t1, t2, t3, t4, t5;
+create table t1(a int, b varchar(10), c date) engine=ndb;
+CREATE TRIGGER trg1 BEFORE UPDATE ON t1 FOR EACH ROW BEGIN
+SET new.c = date(now());
+End //
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+trigger_name event_object_table
+trg1 t1
+CREATE TRIGGER trg1 BEFORE UPDATE ON t1 FOR EACH ROW BEGIN
+SET new.c = date(now());
+End //
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+trigger_name event_object_table
+trg1 t1
+rename table t1 to t2;
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+trigger_name event_object_table
+trg1 t2
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+trigger_name event_object_table
+trg1 t2
+drop table t2;
End of 5.1 tests
=== modified file 'mysql-test/suite/ndb/t/ndb_alter_table_backup.test'
--- a/mysql-test/suite/ndb/t/ndb_alter_table_backup.test 2007-11-29 10:29:35 +0000
+++ b/mysql-test/suite/ndb/t/ndb_alter_table_backup.test 2008-08-07 11:52:50 +0000
@@ -19,8 +19,8 @@ DROP TABLE IF EXISTS t1;
--echo *********************************
--echo * restore tables w/ new column from little endian
--echo *********************************
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT 2>&1
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_le >> $NDB_TOOLS_OUTPUT 2>&1
SHOW TABLES;
SHOW CREATE TABLE t1;
SELECT * FROM t1 WHERE a = 1 or a = 10 or a = 20 or a = 30 ORDER BY a;
@@ -32,8 +32,8 @@ DROP TABLE t1;
--echo *********************************
--echo * restore tables w/ new column from big endian
--echo *********************************
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT
---exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT 2>&1
+--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -r $MYSQL_TEST_DIR/std_data/ndb_backup51_d2_be >> $NDB_TOOLS_OUTPUT 2>&1
SHOW TABLES;
SHOW CREATE TABLE t1;
SELECT * FROM t1 WHERE a = 1 or a = 10 or a = 20 or a = 30 ORDER BY a;
=== modified file 'mysql-test/suite/ndb/t/ndb_trigger.test'
--- a/mysql-test/suite/ndb/t/ndb_trigger.test 2007-11-29 10:29:35 +0000
+++ b/mysql-test/suite/ndb/t/ndb_trigger.test 2008-08-07 11:51:34 +0000
@@ -1,5 +1,6 @@
# Tests which involve triggers and NDB storage engine
---source include/have_ndb.inc
+--source include/have_multi_ndb.inc
+--source include/not_embedded.inc
#
# Test for bug#18437 "Wrong values inserted with a before update
@@ -13,7 +14,12 @@
#
--disable_warnings
+connection server2;
drop table if exists t1, t2, t3, t4, t5;
+flush status;
+connection server1;
+drop table if exists t1, t2, t3, t4, t5;
+flush status;
--enable_warnings
create table t1 (id int primary key, a int not null, b decimal (63,30) default 0) engine=ndb;
@@ -217,4 +223,39 @@ DROP TRIGGER t1_delete;
DROP TRIGGER t4_delete;
DROP TABLE t1, t2, t3, t4, t5;
+# Test for bug#36658
+# Verify that rename table
+# doesn't remove triggers
+
+connection server1;
+create table t1(a int, b varchar(10), c date) engine=ndb;
+delimiter //;
+CREATE TRIGGER trg1 BEFORE UPDATE ON t1 FOR EACH ROW BEGIN
+ SET new.c = date(now());
+End //
+delimiter ;//
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+
+connection server2;
+delimiter //;
+CREATE TRIGGER trg1 BEFORE UPDATE ON t1 FOR EACH ROW BEGIN
+ SET new.c = date(now());
+End //
+delimiter ;//
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+
+connection server1;
+rename table t1 to t2;
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+
+connection server2;
+select trigger_name,event_object_table from information_schema.triggers where
+trigger_name='trg1';
+
+connection server1;
+drop table t2;
+
--echo End of 5.1 tests
=== modified file 'mysql-test/suite/ndb_binlog/r/ndb_binlog_ddl_multi.result'
--- a/mysql-test/suite/ndb_binlog/r/ndb_binlog_ddl_multi.result 2008-02-25 13:50:20 +0000
+++ b/mysql-test/suite/ndb_binlog/r/ndb_binlog_ddl_multi.result 2008-08-07 05:29:44 +0000
@@ -33,11 +33,11 @@ drop table mysqltest.t1;
show binlog events from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query 102 # ALTER DATABASE mysqltest CHARACTER SET latin1
-master-bin.000001 # Query 102 # use `mysqltest`; drop table `t1`
+master-bin.000001 # Query 102 # use `mysqltest`; drop table `mysqltest`.`t1`
show binlog events from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query 102 # ALTER DATABASE mysqltest CHARACTER SET latin1
-master-bin.000001 # Query 102 # use `mysqltest`; drop table `t1`
+master-bin.000001 # Query 102 # use `mysqltest`; drop table `mysqltest`.`t1`
reset master;
reset master;
use test;
@@ -161,10 +161,10 @@ Log_name Pos Event_type Server_id End_lo
master-bin1.000001 # Query 1 # use `test`; create table t1 (a int key) engine=ndb
master-bin1.000001 # Query 1 # use `test`; create table t2 (a int key) engine=ndb
master-bin1.000001 # Query 1 # use `test`; create table t3 (a int key) engine=ndb
-master-bin1.000001 # Query 1 # use `test`; rename table `test.t3` to `test.t4`
-master-bin1.000001 # Query 1 # use `test`; rename table `test.t2` to `test.t3`
-master-bin1.000001 # Query 1 # use `test`; rename table `test.t1` to `test.t2`
-master-bin1.000001 # Query 1 # use `test`; rename table `test.t4` to `test.t1`
+master-bin1.000001 # Query 1 # use `test`; rename table `test`.`t3` to `test`.`t4`
+master-bin1.000001 # Query 1 # use `test`; rename table `test`.`t2` to `test`.`t3`
+master-bin1.000001 # Query 1 # use `test`; rename table `test`.`t1` to `test`.`t2`
+master-bin1.000001 # Query 1 # use `test`; rename table `test`.`t4` to `test`.`t1`
drop table t1;
drop table t2;
drop table t3;
@@ -188,7 +188,7 @@ master-bin1.000001 # Table_map 102 # tab
master-bin1.000001 # Write_rows 102 # table_id: #
master-bin1.000001 # Write_rows 102 # table_id: # flags: STMT_END_F
master-bin1.000001 # Query 102 # COMMIT
-master-bin1.000001 # Query 1 # use `test`; rename table `test.t1` to `test.t2`
+master-bin1.000001 # Query 1 # use `test`; rename table `test`.`t1` to `test`.`t2`
master-bin1.000001 # Query 102 # BEGIN
master-bin1.000001 # Table_map 102 # table_id: # (test.t2)
master-bin1.000001 # Table_map 102 # table_id: # (mysql.ndb_apply_status)
=== modified file 'mysql-test/suite/ndb_binlog/r/ndb_binlog_log_bin.result'
--- a/mysql-test/suite/ndb_binlog/r/ndb_binlog_log_bin.result 2008-06-19 07:18:42 +0000
+++ b/mysql-test/suite/ndb_binlog/r/ndb_binlog_log_bin.result 2008-08-07 05:29:44 +0000
@@ -53,8 +53,8 @@ use mysqltest;
insert into t2 values (1,1);
show binlog events from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info
-master-bin1.000001 # Query 1 # use `mysqltest`; drop table `t1`
-master-bin1.000001 # Query 1 # use `mysqltest`; drop table `t2`
+master-bin1.000001 # Query 1 # use `mysqltest`; drop table `mysqltest`.`t1`
+master-bin1.000001 # Query 1 # use `mysqltest`; drop table `mysqltest`.`t2`
master-bin1.000001 # Query 1 # use `mysqltest`; create table t1 (d int key, e int) engine=ndb
master-bin1.000001 # Query 1 # use `mysqltest`; create table t2 (d int key, e int) engine=ndb
master-bin1.000001 # Query 102 # BEGIN
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2008-07-01 07:53:42 +0000
+++ b/sql/ha_ndbcluster.cc 2008-08-07 13:17:36 +0000
@@ -158,6 +158,7 @@ static uchar *ndbcluster_get_key(NDB_SHA
static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*,
const NdbRecord *, struct Ndb_statistics *);
+THD *injector_thd= 0;
// Util thread variables
pthread_t ndb_util_thread;
@@ -6955,6 +6956,17 @@ int ha_ndbcluster::rename_table(const ch
DBUG_ENTER("ha_ndbcluster::rename_table");
DBUG_PRINT("info", ("Renaming %s to %s", from, to));
+
+ if (thd == injector_thd)
+ {
+ /*
+ Table was renamed remotely is already
+ renamed inside ndb.
+ Just rename .ndb file.
+ */
+ DBUG_RETURN(handler::rename_table(from, to));
+ }
+
set_dbname(from, old_dbname);
set_dbname(to, new_dbname);
set_tabname(from);
@@ -7025,7 +7037,7 @@ int ha_ndbcluster::rename_table(const ch
}
ERR_RETURN(ndb_error);
}
-
+
// Rename .ndb file
if ((result= handler::rename_table(from, to)))
{
@@ -7323,6 +7335,17 @@ int ha_ndbcluster::delete_table(const ch
int error= 0;
DBUG_ENTER("ha_ndbcluster::delete_table");
DBUG_PRINT("enter", ("name: %s", name));
+
+ if (thd == injector_thd)
+ {
+ /*
+ Table was dropped remotely is already
+ dropped inside ndb.
+ Just drop local files.
+ */
+ DBUG_RETURN(handler::delete_table(name));
+ }
+
set_dbname(name);
set_tabname(name);
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2008-06-25 13:06:41 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2008-08-07 13:17:36 +0000
@@ -71,7 +71,7 @@ my_bool ndb_binlog_is_ready= FALSE;
Has one sole purpose, for setting the in_use table member variable
in get_share(...)
*/
-THD *injector_thd= 0;
+extern THD * injector_thd; // Declared in ha_ndbcluster.cc
/*
Global reference to ndb injector thd object.
@@ -140,12 +140,6 @@ static void ndb_free_schema_object(NDB_S
bool have_lock);
/*
- Global variables for holding the ndb_binlog_index table reference
-*/
-static TABLE *ndb_binlog_index= 0;
-static TABLE_LIST binlog_tables;
-
-/*
Helper functions
*/
@@ -308,15 +302,6 @@ static void run_query(THD *thd, char *bu
thd->transaction.all= save_thd_transaction_all;
thd->transaction.stmt= save_thd_transaction_stmt;
thd->net= save_thd_net;
-
- if (thd == injector_thd)
- {
- /*
- running the query will close all tables, including the ndb_binlog_index
- used in injector_thd
- */
- ndb_binlog_index= 0;
- }
}
static void
@@ -1307,8 +1292,9 @@ int ndbcluster_log_schema_op(THD *thd,
DBUG_RETURN(0);
/* redo the drop table query as is may contain several tables */
query= tmp_buf2;
- query_length= (uint) (strxmov(tmp_buf2, "drop table `",
- table_name, "`", NullS) - tmp_buf2);
+ query_length= (uint) (strxmov(tmp_buf2, "drop table ",
+ "`", db, "`", ".",
+ "`", table_name, "`", NullS) - tmp_buf2);
type_str= "drop table";
break;
case SOT_RENAME_TABLE_PREPARE:
@@ -1318,9 +1304,11 @@ int ndbcluster_log_schema_op(THD *thd,
case SOT_RENAME_TABLE:
/* redo the rename table query as is may contain several tables */
query= tmp_buf2;
- query_length= (uint) (strxmov(tmp_buf2, "rename table `",
- db, ".", table_name, "` to `",
- new_db, ".", new_table_name, "`", NullS) - tmp_buf2);
+ query_length= (uint) (strxmov(tmp_buf2, "rename table ",
+ "`", db, "`", ".",
+ "`", table_name, "` to ",
+ "`", new_db, "`", ".",
+ "`", new_table_name, "`", NullS) - tmp_buf2);
type_str= "rename table";
break;
case SOT_CREATE_TABLE:
@@ -1863,17 +1851,53 @@ ndb_binlog_thread_handle_schema_event(TH
if (schema->node_id != node_id)
{
int log_query= 0, post_epoch_unlock= 0;
+ char errmsg[MYSQL_ERRMSG_SIZE];
+
switch (schema_type)
{
- case SOT_DROP_TABLE:
- // fall through
case SOT_RENAME_TABLE:
// fall through
case SOT_RENAME_TABLE_NEW:
- post_epoch_log_list->push_back(schema, mem_root);
- /* acknowledge this query _after_ epoch completion */
- post_epoch_unlock= 1;
- break;
+ {
+ uint end= snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
+ "NDB Binlog: Skipping renaming locally defined table '%s.%s' from binlog schema event '%s' from node %d. ",
+ schema->db, schema->name, schema->query,
+ schema->node_id);
+
+ errmsg[end]= '\0';
+ }
+ // fall through
+ case SOT_DROP_TABLE:
+ if (schema_type == SOT_DROP_TABLE)
+ {
+ uint end= snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
+ "NDB Binlog: Skipping dropping locally defined table '%s.%s' from binlog schema event '%s' from node %d. ",
+ schema->db, schema->name, schema->query,
+ schema->node_id);
+ errmsg[end]= '\0';
+ }
+ if (! ndbcluster_check_if_local_table(schema->db, schema->name))
+ {
+ const int no_print_error[1]=
+ {ER_BAD_TABLE_ERROR}; /* ignore missing table */
+ run_query(thd, schema->query,
+ schema->query + schema->query_length,
+ no_print_error, // /* don't print error */
+ TRUE); // /* don't binlog the query */
+
+ /* binlog dropping table after any table operations */
+ post_epoch_log_list->push_back(schema, mem_root);
+ /* acknowledge this query _after_ epoch completion */
+ post_epoch_unlock= 1;
+ }
+ else
+ {
+ /* Tables exists as a local table, leave it */
+ DBUG_PRINT("info", ((const char *) errmsg));
+ sql_print_error((const char *) errmsg);
+ log_query= 1;
+ }
+ // Fall through
case SOT_TRUNCATE_TABLE:
{
char key[FN_REFLEN];
@@ -1909,6 +1933,8 @@ ndb_binlog_thread_handle_schema_event(TH
free_share(&share);
}
}
+ if (schema_type != SOT_TRUNCATE_TABLE)
+ break;
// fall through
case SOT_CREATE_TABLE:
pthread_mutex_lock(&LOCK_open);
@@ -2531,8 +2557,8 @@ struct ndb_binlog_index_row {
/*
Open the ndb_binlog_index table
*/
-static int open_ndb_binlog_index(THD *thd, TABLE_LIST *tables,
- TABLE **ndb_binlog_index)
+static int open_and_lock_ndb_binlog_index(THD *thd, TABLE_LIST *tables,
+ TABLE **ndb_binlog_index)
{
static char repdb[]= NDB_REP_DB;
static char reptable[]= NDB_REP_TABLE;
@@ -2544,9 +2570,8 @@ static int open_ndb_binlog_index(THD *th
tables->lock_type= TL_WRITE;
thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
tables->required_type= FRMTYPE_TABLE;
- uint counter;
thd->clear_error();
- if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH))
+ if (simple_open_n_lock_tables(thd, tables))
{
if (thd->killed)
sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
@@ -2563,32 +2588,6 @@ static int open_ndb_binlog_index(THD *th
return 0;
}
-
-/*
- Check if ndb_binlog_index is lockable, if not close, to free for
- other threads use
-*/
-
-static void
-ndb_check_ndb_binlog_index(THD *thd)
-{
- if (!ndb_binlog_index)
- return;
-
- bool need_reopen;
- if (lock_tables(thd, &binlog_tables, 1, &need_reopen))
- {
- ndb_binlog_index= 0;
- close_thread_tables(thd);
- ndb_binlog_index= 0;
- return;
- }
-
- mysql_unlock_tables(thd, thd->lock);
- thd->lock= 0;
- return;
-}
-
/*
Insert one row in the ndb_binlog_index
*/
@@ -2597,8 +2596,10 @@ static int
ndb_add_ndb_binlog_index(THD *thd, ndb_binlog_index_row *row)
{
int error= 0;
- bool need_reopen;
ndb_binlog_index_row *first= row;
+ TABLE *ndb_binlog_index= 0;
+ TABLE_LIST binlog_tables;
+
/*
Turn of binlogging to prevent the table changes to be written to
the binary log.
@@ -2606,28 +2607,12 @@ ndb_add_ndb_binlog_index(THD *thd, ndb_b
ulong saved_options= thd->options;
thd->options&= ~(OPTION_BIN_LOG);
- for ( ; ; ) /* loop for need_reopen */
- {
- if (!ndb_binlog_index && open_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index))
- {
- error= -1;
- goto add_ndb_binlog_index_err;
- }
- if (lock_tables(thd, &binlog_tables, 1, &need_reopen))
- {
- if (need_reopen)
- {
- TABLE_LIST *p_binlog_tables= &binlog_tables;
- close_tables_for_reopen(thd, &p_binlog_tables);
- ndb_binlog_index= 0;
- continue;
- }
- sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
- error= -1;
- goto add_ndb_binlog_index_err;
- }
- break;
+ if (open_and_lock_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index))
+ {
+ sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
+ error= -1;
+ goto add_ndb_binlog_index_err;
}
/*
@@ -2676,13 +2661,8 @@ ndb_add_ndb_binlog_index(THD *thd, ndb_b
}
} while (row);
- mysql_unlock_tables(thd, thd->lock);
- thd->lock= 0;
- thd->options= saved_options;
- return 0;
add_ndb_binlog_index_err:
close_thread_tables(thd);
- ndb_binlog_index= 0;
thd->options= saved_options;
return error;
}
@@ -5228,15 +5208,6 @@ restart:
!ndb_binlog_running))
break; /* Shutting down server */
- if (ndb_binlog_index && ndb_binlog_index->s->version < refresh_version)
- {
- if (ndb_binlog_index->s->version < refresh_version)
- {
- close_thread_tables(thd);
- ndb_binlog_index= 0;
- }
- }
-
MEM_ROOT **root_ptr=
my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
MEM_ROOT *old_root= *root_ptr;
@@ -5294,17 +5265,8 @@ restart:
}
}
- /*
- For each epoch atleast one check should be made to see if ndb_binlog_index
- table is lockable. Variable 'do_check_ndb_binlog_index' is used as a flag
- to signal if a check is needed for current epoch.
- */
- int do_check_ndb_binlog_index= 1;
-
if (res > 0)
{
- do_check_ndb_binlog_index= 1;
-
DBUG_PRINT("info", ("pollEvents res: %d", res));
thd->proc_info= "Processing events";
NdbEventOperation *pOp= i_ndb->nextEvent();
@@ -5577,19 +5539,12 @@ restart:
if (ndb_update_ndb_binlog_index)
{
ndb_add_ndb_binlog_index(thd, rows);
- do_check_ndb_binlog_index= 0;
}
ndb_latest_applied_binlog_epoch= gci;
break;
}
ndb_latest_handled_binlog_epoch= gci;
- if (do_check_ndb_binlog_index)
- {
- ndb_check_ndb_binlog_index(thd);
- do_check_ndb_binlog_index= 0;
- }
-
#ifdef RUN_NDB_BINLOG_TIMER
gci_timer.stop();
sql_print_information("gci %ld event_count %d write time "
@@ -5622,25 +5577,16 @@ restart:
free_root(&mem_root, MYF(0));
*root_ptr= old_root;
ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
-
- if (do_check_ndb_binlog_index)
- {
- ndb_check_ndb_binlog_index(thd);
- do_check_ndb_binlog_index= 0;
- }
}
if (do_ndbcluster_binlog_close_connection == BCCC_restart)
{
ndb_binlog_tables_inited= FALSE;
- close_thread_tables(thd);
- ndb_binlog_index= 0;
goto restart;
}
err:
sql_print_information("Stopping Cluster Binlog");
DBUG_PRINT("info",("Shutting down cluster binlog thread"));
thd->proc_info= "Shutting down";
- close_thread_tables(thd);
pthread_mutex_lock(&injector_mutex);
/* don't mess with the injector_ndb anymore from other threads */
injector_thd= 0;
=== modified file 'storage/ndb/include/kernel/signaldata/LqhKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/LqhKey.hpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/kernel/signaldata/LqhKey.hpp 2008-08-07 11:52:50 +0000
@@ -40,6 +40,10 @@ public:
STATIC_CONST( MaxKeyInfo = 4 );
STATIC_CONST( MaxAttrInfo = 5);
+ /* Long LQHKEYREQ definitions */
+ STATIC_CONST( KeyInfoSectionNum = 0 );
+ STATIC_CONST( AttrInfoSectionNum = 1 );
+
private:
/**
@@ -147,7 +151,8 @@ private:
/**
* Request Info
*
- * k = Key len - 10 Bits (0-9) max 1023
+ * k = Key len - (Short LQHKEYREQ only)
+ * 10 Bits (0-9) max 1023
* l = Last Replica No - 2 Bits -> Max 3 (10-11)
IF version < NDBD_ROWID_VERSION
@@ -158,7 +163,8 @@ private:
* s = Simple indicator - 1 Bit (18)
* o = Operation - 3 Bits (19-21)
* r = Sequence replica - 2 Bits (22-23)
- * a = Attr Info in LQHKEYREQ - 3 Bits (24-26)
+ * a = Attr Info in LQHKEYREQ - (Short LQHKEYREQ only)
+ 3 Bits (24-26)
* c = Same client and tc - 1 Bit (27)
* u = Read Len Return Ind - 1 Bit (28)
* m = Commit ack marker - 1 Bit (29)
@@ -167,10 +173,17 @@ private:
* g = gci flag - 1 Bit (12)
* n = NR copy - 1 Bit (13)
+ * Short LQHKEYREQ :
* 1111111111222222222233
* 01234567890123456789012345678901
* kkkkkkkkkklltttpdisooorraaacumxz
* kkkkkkkkkkllgn pdisooorraaacumxz
+ *
+ * Long LQHKEYREQ :
+ * 1111111111222222222233
+ * 01234567890123456789012345678901
+ * llgn pdisooorr cumxz
+ *
*/
#define RI_KEYLEN_SHIFT (0)
@@ -200,7 +213,8 @@ private:
/**
* Scan Info
*
- * a = Attr Len - 16 Bits -> max 65535 (0-15)
+ * a = Attr Len - (Short LQHKEYREQ only)
+ * 16 Bits -> max 65535 (0-15)
* p = Stored Procedure Ind - 1 Bit (16)
* d = Distribution key - 8 Bit -> max 255 (17-24)
* t = Scan take over indicator - 1 Bit (25)
@@ -208,7 +222,8 @@ private:
*
* 1111111111222222222233
* 01234567890123456789012345678901
- * aaaaaaaaaaaaaaaapddddddddtmm
+ * aaaaaaaaaaaaaaaapddddddddtmm (Short LQHKEYREQ)
+ * pddddddddtmm (Long LQHKEYREQ)
*/
#define SI_ATTR_LEN_MASK (65535)
=== modified file 'storage/ndb/include/kernel/signaldata/NextScan.hpp'
--- a/storage/ndb/include/kernel/signaldata/NextScan.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/signaldata/NextScan.hpp 2008-08-07 11:52:50 +0000
@@ -47,6 +47,7 @@ class NextScanConf {
public:
// length is less if no keyinfo or no next result
STATIC_CONST( SignalLength = 11 );
+ STATIC_CONST( SignalLengthNoKeyInfo = 7 );
private:
Uint32 scanPtr; // scan record in LQH
Uint32 accOperationPtr;
=== modified file 'storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp'
--- a/storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp 2008-08-07 11:52:50 +0000
@@ -21,10 +21,11 @@
class SignalDroppedRep {
/**
- * Reciver(s)
+ * Receiver(s)
*/
friend class SimulatedBlock;
friend class Dbtc;
+ friend class Dblqh;
/**
* Sender (TransporterCallback.cpp)
=== modified file 'storage/ndb/include/kernel/signaldata/TupKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/TupKey.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/signaldata/TupKey.hpp 2008-08-07 11:52:50 +0000
@@ -35,7 +35,7 @@ class TupKeyReq {
friend bool printTUPKEYREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo);
public:
- STATIC_CONST( SignalLength = 18 );
+ STATIC_CONST( SignalLength = 19 );
private:
@@ -60,6 +60,7 @@ private:
Uint32 disk_page;
Uint32 m_row_id_page_no;
Uint32 m_row_id_page_idx;
+ Uint32 attrInfoIVal;
};
class TupKeyConf {
=== modified file 'storage/ndb/include/mgmapi/mgmapi.h'
--- a/storage/ndb/include/mgmapi/mgmapi.h 2008-04-07 10:26:34 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi.h 2008-08-07 04:13:41 +0000
@@ -444,10 +444,6 @@ extern "C" {
int ndb_mgm_number_of_mgmd_in_connect_string(NdbMgmHandle handle);
int ndb_mgm_set_configuration_nodeid(NdbMgmHandle handle, int nodeid);
- int ndb_mgm_get_configuration_nodeid(NdbMgmHandle handle);
- int ndb_mgm_get_connected_port(NdbMgmHandle handle);
- const char *ndb_mgm_get_connected_host(NdbMgmHandle handle);
- const char *ndb_mgm_get_connectstring(NdbMgmHandle handle, char *buf, int buf_sz);
/**
* Set local bindaddress
=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in 2008-06-18 09:01:43 +0000
+++ b/storage/ndb/include/ndb_version.h.in 2008-08-07 11:52:50 +0000
@@ -99,6 +99,8 @@ Uint32 ndbGetOwnVersion();
#define NDBD_MICRO_GCP_62 NDB_MAKE_VERSION(6,2,5)
#define NDBD_MICRO_GCP_63 NDB_MAKE_VERSION(6,3,2)
#define NDBD_RAW_LCP MAKE_VERSION(6,3,11)
+#define NDBD_LONG_LQHKEYREQ MAKE_VERSION(6,4,0)
+
static
inline
=== modified file 'storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp 2008-02-19 15:00:29 +0000
+++ b/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp 2008-08-07 06:24:52 +0000
@@ -309,6 +309,7 @@ int
NdbIndexScanOperation::setBound(const char* attr, int type, const void* value,
Uint32 len)
{
+ (void)len; // unused
return setBound(attr, type, value);
}
@@ -317,6 +318,7 @@ int
NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, const void* value,
Uint32 len)
{
+ (void)len; // unused
return setBound(anAttrId, type, value);
}
=== modified file 'storage/ndb/include/ndbapi/NdbOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp 2008-08-07 13:17:36 +0000
@@ -1457,6 +1457,9 @@ inline
int
NdbOperation::checkMagicNumber(bool b)
{
+#ifndef NDB_NO_DROPPED_SIGNAL
+ (void)b; // unused param in this context
+#endif
if (theMagicNumber != 0xABCDEF01){
#ifdef NDB_NO_DROPPED_SIGNAL
if(b) abort();
@@ -1576,8 +1579,10 @@ NdbOperation::NdbCon(NdbTransaction* aNd
inline
int
-NdbOperation::equal(const char* anAttrName, const char* aValue, Uint32 len)
+NdbOperation::equal(const char* anAttrName, const char* aValue,
+ Uint32 len)
{
+ (void)len; // unused
return equal(anAttrName, aValue);
}
@@ -1611,8 +1616,10 @@ NdbOperation::equal(const char* anAttrNa
inline
int
-NdbOperation::equal(Uint32 anAttrId, const char* aValue, Uint32 len)
+NdbOperation::equal(Uint32 anAttrId, const char* aValue,
+ Uint32 len)
{
+ (void)len; // unused
return equal(anAttrId, aValue);
}
@@ -1646,8 +1653,10 @@ NdbOperation::equal(Uint32 anAttrId, Uin
inline
int
-NdbOperation::setValue(const char* anAttrName, const char* aValue, Uint32 len)
+NdbOperation::setValue(const char* anAttrName, const char* aValue,
+ Uint32 len)
{
+ (void)len; // unused
return setValue(anAttrName, aValue);
}
@@ -1695,8 +1704,10 @@ NdbOperation::setValue(const char* anAtt
inline
int
-NdbOperation::setValue(Uint32 anAttrId, const char* aValue, Uint32 len)
+NdbOperation::setValue(Uint32 anAttrId, const char* aValue,
+ Uint32 len)
{
+ (void)len; // unused
return setValue(anAttrId, aValue);
}
=== modified file 'storage/ndb/include/ndbapi/NdbTransaction.hpp'
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp 2008-02-19 15:00:29 +0000
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp 2008-08-07 06:24:52 +0000
@@ -1093,6 +1093,7 @@ inline
void
NdbTransaction::set_send_size(Uint32 send_size)
{
+ (void)send_size; //unused
return;
}
=== modified file 'storage/ndb/src/common/debugger/signaldata/LqhKey.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp 2008-08-07 11:52:50 +0000
@@ -145,12 +145,16 @@ printLQHKEYREQ(FILE * output, const Uint
fprintf(output, "H\'%.8x ", sig->variableData[nextPos]);
fprintf(output, "\n");
} else {
- fprintf(output, " InitialReadSize: %d InterpretedSize: %d "
- "FinalUpdateSize: %d FinalReadSize: %d SubroutineSize: %d\n",
- sig->variableData[nextPos+0], sig->variableData[nextPos+1],
- sig->variableData[nextPos+2], sig->variableData[nextPos+3],
- sig->variableData[nextPos+4]);
- nextPos += 5;
+ /* Only have section sizes if it's a short LQHKEYREQ */
+ if (LqhKeyReq::getAIInLqhKeyReq(reqInfo) == LqhKeyReq::MaxAttrInfo)
+ {
+ fprintf(output, " InitialReadSize: %d InterpretedSize: %d "
+ "FinalUpdateSize: %d FinalReadSize: %d SubroutineSize: %d\n",
+ sig->variableData[nextPos+0], sig->variableData[nextPos+1],
+ sig->variableData[nextPos+2], sig->variableData[nextPos+3],
+ sig->variableData[nextPos+4]);
+ nextPos += 5;
+ }
}
return true;
}
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-08-07 11:52:50 +0000
@@ -1920,9 +1920,9 @@ public:
LOG_CONNECTED = 3
};
ConnectState connectState;
- UintR copyCountWords;
- UintR firstAttrinfo[5];
- UintR tupkeyData[4];
+ UintR copyCountWords;
+ Uint32 keyInfoIVal;
+ Uint32 attrInfoIVal;
UintR transid[2];
AbortState abortState;
UintR accConnectrec;
@@ -1931,15 +1931,15 @@ public:
UintR tcTimer;
UintR currReclenAi;
UintR currTupAiLen;
- UintR firstAttrinbuf;
- UintR firstTupkeybuf;
+ UintR firstAttrinbuf; // TODO : Remove
+ UintR firstTupkeybuf; // TODO : Remove
UintR fragmentid;
UintR fragmentptr;
UintR gci_hi;
UintR gci_lo;
UintR hashValue;
- UintR lastTupkeybuf;
- UintR lastAttrinbuf;
+ UintR lastTupkeybuf; // TODO : Remove
+ UintR lastAttrinbuf; // TODO : Remove
/**
* Each operation (TcConnectrec) can be stored in max one out of many
* lists.
@@ -2013,6 +2013,10 @@ public:
Uint8 m_disk_table;
Uint8 m_use_rowid;
Uint8 m_dealloc;
+ enum op_flags {
+ OP_ISLONGREQ = 0x1,
+ OP_SAVEATTRINFO = 0x2};
+ Uint32 m_flags;
Uint32 m_log_part_ptr_i;
Local_key m_row_id;
@@ -2079,6 +2083,7 @@ private:
void execEXEC_SRREQ(Signal* signal);
void execEXEC_SRCONF(Signal* signal);
void execREAD_PSEUDO_REQ(Signal* signal);
+ void execSIGNAL_DROPPED_REP(Signal* signal);
void execDUMP_STATE_ORD(Signal* signal);
void execACC_ABORTCONF(Signal* signal);
@@ -2324,7 +2329,7 @@ private:
void readExecSrNewMbyte(Signal* signal);
void readExecSr(Signal* signal);
void readKey(Signal* signal);
- void readLogData(Signal* signal, Uint32 noOfWords, Uint32* dataPtr);
+ void readLogData(Signal* signal, Uint32 noOfWords, Uint32& sectionIVal);
void readLogHeader(Signal* signal);
Uint32 readLogword(Signal* signal);
Uint32 readLogwordExec(Signal* signal);
@@ -2343,6 +2348,7 @@ private:
void removePageRef(Signal* signal);
Uint32 returnExecLog(Signal* signal);
int saveTupattrbuf(Signal* signal, Uint32* dataPtr, Uint32 length);
+ int saveAttrInfoInSection(const Uint32* dataPtr, Uint32 len);
void seizeAddfragrec(Signal* signal);
void seizeAttrinbuf(Signal* signal);
Uint32 seize_attrinbuf();
@@ -2370,6 +2376,7 @@ private:
void writeKey(Signal* signal);
void writeLogHeader(Signal* signal);
void writeLogWord(Signal* signal, Uint32 data);
+ void writeLogWords(Signal* signal, const Uint32* data, Uint32 len);
void writeNextLog(Signal* signal);
void errorReport(Signal* signal, int place);
void warningReport(Signal* signal, int place);
@@ -2465,7 +2472,7 @@ private:
void nextScanConfScanLab(Signal* signal);
void nextScanConfCopyLab(Signal* signal);
void continueScanNextReqLab(Signal* signal);
- void keyinfoLab(const Uint32 * src, const Uint32 * end);
+ bool keyinfoLab(const Uint32 * src, Uint32 len);
void copySendTupkeyReqLab(Signal* signal);
void storedProcConfScanLab(Signal* signal);
void storedProcConfCopyLab(Signal* signal);
@@ -2576,6 +2583,8 @@ private:
void exec_acckeyreq(Signal*, Ptr<TcConnectionrec>);
int compare_key(const TcConnectionrec*, const Uint32 * ptr, Uint32 len);
void nr_copy_delete_row(Signal*, Ptr<TcConnectionrec>, Local_key*, Uint32);
+ Uint32 getKeyInfoWordOrZero(const TcConnectionrec* regTcPtr,
+ Uint32 offset);
public:
struct Nr_op_info
{
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp 2008-08-07 11:52:50 +0000
@@ -200,6 +200,8 @@ Dblqh::Dblqh(Block_context& ctx, Uint32
addRecSignal(GSN_ALTER_TAB_REQ, &Dblqh::execALTER_TAB_REQ);
+ addRecSignal(GSN_SIGNAL_DROPPED_REP, &Dblqh::execSIGNAL_DROPPED_REP, true);
+
// Trigger signals, transit to from TUP
addRecSignal(GSN_CREATE_TRIG_IMPL_REQ, &Dblqh::execCREATE_TRIG_IMPL_REQ);
addRecSignal(GSN_CREATE_TRIG_IMPL_CONF, &Dblqh::execCREATE_TRIG_IMPL_CONF);
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-07-23 11:36:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-08-07 11:52:50 +0000
@@ -63,9 +63,12 @@
#include <signaldata/RestoreImpl.hpp>
#include <signaldata/KeyInfo.hpp>
#include <signaldata/AttrInfo.hpp>
+#include <signaldata/TransIdAI.hpp>
#include <KeyDescriptor.hpp>
#include <signaldata/RouteOrd.hpp>
#include <signaldata/FsRef.hpp>
+#include <SectionReader.hpp>
+#include <signaldata/SignalDroppedRep.hpp>
#include "../suma/Suma.hpp"
@@ -214,11 +217,6 @@ void Dblqh::execCONTINUEB(Signal* signal
ndbout << " tcNodeFailrec = " << tcConnectptr.p->tcNodeFailrec;
ndbout << " activeCreat = " << tcConnectptr.p->activeCreat;
ndbout << endl;
- ndbout << "tupkeyData0 = " << tcConnectptr.p->tupkeyData[0];
- ndbout << "tupkeyData1 = " << tcConnectptr.p->tupkeyData[1];
- ndbout << "tupkeyData2 = " << tcConnectptr.p->tupkeyData[2];
- ndbout << "tupkeyData3 = " << tcConnectptr.p->tupkeyData[3];
- ndbout << endl;
ndbout << "abortState = " << tcConnectptr.p->abortState;
ndbout << "listState = " << tcConnectptr.p->listState;
ndbout << endl;
@@ -3185,25 +3183,45 @@ Uint32 Dblqh::handleLongTupKey(Signal* s
Uint32 len)
{
TcConnectionrec * const regTcPtr = tcConnectptr.p;
- Uint32 dataPos = 0;
+ TcConnectionrec::TransactionState state = regTcPtr->transactionState;
Uint32 total = regTcPtr->save1 + len;
Uint32 primKeyLen = regTcPtr->primKeyLen;
- while (dataPos < len) {
- if (cfirstfreeDatabuf == RNIL) {
+ if (state == TcConnectionrec::WAIT_TUPKEYINFO)
+ {
+ /* Different handling for TUPKEYREQ to SCANAI
+ * TODO : Converge once long scan implemented
+ */
+ bool ok= appendToSection(regTcPtr->keyInfoIVal,
+ dataPtr,
+ len);
+ if (unlikely(!ok))
+ {
jam();
return ZGET_DATAREC_ERROR;
- }//if
- seizeTupkeybuf(signal);
- Databuf * const regDataPtr = databufptr.p;
- Uint32 data0 = dataPtr[dataPos];
- Uint32 data1 = dataPtr[dataPos + 1];
- Uint32 data2 = dataPtr[dataPos + 2];
- Uint32 data3 = dataPtr[dataPos + 3];
- regDataPtr->data[0] = data0;
- regDataPtr->data[1] = data1;
- regDataPtr->data[2] = data2;
- regDataPtr->data[3] = data3;
- dataPos += 4;
+ }
+ }
+ else
+ {
+ /* Scan_AI KeyInfo */
+ Uint32 dataPos = 0;
+
+ while (dataPos < len) {
+ if (cfirstfreeDatabuf == RNIL) {
+ jam();
+ return ZGET_DATAREC_ERROR;
+ }//if
+ seizeTupkeybuf(signal);
+ Databuf * const regDataPtr = databufptr.p;
+ Uint32 data0 = dataPtr[dataPos];
+ Uint32 data1 = dataPtr[dataPos + 1];
+ Uint32 data2 = dataPtr[dataPos + 2];
+ Uint32 data3 = dataPtr[dataPos + 3];
+ regDataPtr->data[0] = data0;
+ regDataPtr->data[1] = data1;
+ regDataPtr->data[2] = data2;
+ regDataPtr->data[3] = data3;
+ dataPos += 4;
+ }
}
regTcPtr->save1 = total;
@@ -3329,23 +3347,42 @@ Dblqh::receive_attrinfo(Signal* signal,
void Dblqh::execTUP_ATTRINFO(Signal* signal)
{
TcConnectionrec *regTcConnectionrec = tcConnectionrec;
- Uint32 length = signal->length() - 3;
Uint32 tcIndex = signal->theData[0];
Uint32 ttcConnectrecFileSize = ctcConnectrecFileSize;
jamEntry();
tcConnectptr.i = tcIndex;
ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
- ndbrequire(tcConnectptr.p->transactionState == TcConnectionrec::WAIT_TUP);
- if (saveTupattrbuf(signal, &signal->theData[3], length) == ZOK) {
- return;
- } else {
- jam();
-/* ------------------------------------------------------------------------- */
-/* WE ARE WAITING FOR RESPONSE FROM TUP HERE. THUS WE NEED TO */
-/* GO THROUGH THE STATE MACHINE FOR THE OPERATION. */
-/* ------------------------------------------------------------------------- */
- localAbortStateHandlerLab(signal);
- }//if
+ TcConnectionrec * const regTcPtr = tcConnectptr.p;
+
+ ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP);
+
+ /* TUP_ATTRINFO signal is unrelated to ATTRINFO
+ * It just transports a section IVAL from TUP back to
+ * LQH
+ */
+ ndbrequire(signal->header.theLength == 3);
+ Uint32 tupAttrInfoWords= signal->theData[1];
+ Uint32 tupAttrInfoIVal= signal->theData[2];
+
+ ndbassert(tupAttrInfoWords > 0);
+ ndbassert(tupAttrInfoIVal != RNIL);
+
+ /* If we have stored ATTRINFO that we sent to TUP,
+ * free it now
+ */
+ if (regTcPtr->attrInfoIVal != RNIL)
+ {
+ /* We should be expecting to receive attrInfo back */
+ ndbassert( !(regTcPtr->m_flags &
+ TcConnectionrec::OP_SAVEATTRINFO) );
+ releaseSection( regTcPtr->attrInfoIVal );
+ regTcPtr->attrInfoIVal= RNIL;
+ }
+
+ /* Store reference to ATTRINFO from TUP */
+ regTcPtr->attrInfoIVal= tupAttrInfoIVal;
+ regTcPtr->currTupAiLen= tupAttrInfoWords;
+
}//Dblqh::execTUP_ATTRINFO()
/* ------------------------------------------------------------------------- */
@@ -3354,26 +3391,19 @@ void Dblqh::execTUP_ATTRINFO(Signal* sig
/* ------------------------------------------------------------------------- */
void Dblqh::lqhAttrinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length)
{
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->operation != ZREAD) {
- if (regTcPtr->operation != ZDELETE)
- {
- if (regTcPtr->opExec != 1) {
- if (saveTupattrbuf(signal, dataPtr, length) == ZOK) {
- ;
- } else {
- jam();
+ /* Store received AttrInfo in a long section */
+ jam();
+ if (saveAttrInfoInSection(dataPtr, length) == ZOK) {
+ ;
+ } else {
+ jam();
/* ------------------------------------------------------------------------- */
/* WE MIGHT BE WAITING FOR RESPONSE FROM SOME BLOCK HERE. THUS WE NEED TO */
/* GO THROUGH THE STATE MACHINE FOR THE OPERATION. */
/* ------------------------------------------------------------------------- */
- localAbortStateHandlerLab(signal);
- return;
- }//if
- }//if
- }//if
- }
- c_tup->receive_attrinfo(signal, regTcPtr->tupConnectrec, dataPtr, length);
+ localAbortStateHandlerLab(signal);
+ return;
+ }//if
}//Dblqh::lqhAttrinfoLab()
/* ------------------------------------------------------------------------- */
@@ -3438,6 +3468,32 @@ int Dblqh::saveTupattrbuf(Signal* signal
return ZOK;
}//Dblqh::saveTupattrbuf()
+
+/* ------------------------------------------------------------------------- */
+/* ------- SAVE ATTRINFO INTO ATTR SECTION ------- */
+/* */
+/* ------------------------------------------------------------------------- */
+int Dblqh::saveAttrInfoInSection(const Uint32* dataPtr, Uint32 len)
+{
+ TcConnectionrec * const regTcPtr = tcConnectptr.p;
+
+ bool ok= appendToSection(regTcPtr->attrInfoIVal,
+ dataPtr,
+ len);
+
+ if (unlikely(!ok))
+ {
+ jam();
+ terrorCode = ZGET_ATTRINBUF_ERROR;
+ return ZGET_ATTRINBUF_ERROR;
+ }//if
+
+ regTcPtr->currTupAiLen+= len;
+
+ return ZOK;
+} // saveAttrInfoInSection
+
+
/* ==========================================================================
* ======= SEIZE ATTRIBUTE IN BUFFER =======
*
@@ -3467,6 +3523,7 @@ void Dblqh::seizeAttrinbuf(Signal* signa
attrinbufptr = regAttrinbufptr;
}//Dblqh::seizeAttrinbuf()
+
/* ==========================================================================
* ======= SEIZE TC CONNECT RECORD =======
*
@@ -3551,6 +3608,47 @@ bool Dblqh::checkTransporterOverloaded(S
return !mask.isclear();
}
+void Dblqh::execSIGNAL_DROPPED_REP(Signal* signal)
+{
+ /* An incoming signal was dropped, handle it
+ * Dropped signal really means that we ran out of
+ * long signal buffering to store its sections
+ */
+ jamEntry();
+
+ const SignalDroppedRep* rep = (SignalDroppedRep*) &signal->theData[0];
+ Uint32 originalGSN= rep->originalGsn;
+
+ DEBUG("SignalDroppedRep received for GSN " << originalGSN);
+
+ switch(originalGSN) {
+ case GSN_LQHKEYREQ:
+ {
+ jam();
+ /* Get original signal data - unfortunately it may
+ * have been truncated. We must not read beyond
+ * word # 22
+ * We will notify the client that their LQHKEYREQ
+ * failed
+ */
+ const LqhKeyReq * const truncatedLqhKeyReq =
+ (LqhKeyReq *) &rep->originalData[0];
+
+ noFreeRecordLab(signal, truncatedLqhKeyReq, ZGET_DATAREC_ERROR);
+
+ break;
+ }
+ default:
+ jam();
+ /* Don't expect dropped signals for other GSNs,
+ * default handling
+ */
+ SimulatedBlock::execSIGNAL_DROPPED_REP(signal);
+ };
+
+ return;
+}
+
/* ------------------------------------------------------------------------- */
/* ------- TAKE CARE OF LQHKEYREQ ------- */
/* LQHKEYREQ IS THE SIGNAL THAT STARTS ALL OPERATIONS IN THE LQH BLOCK */
@@ -3563,6 +3661,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
Uint8 tfragDistKey;
const LqhKeyReq * const lqhKeyReq = (LqhKeyReq *)signal->getDataPtr();
+ SectionHandle handle(this, signal);
{
const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded();
@@ -3570,6 +3669,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
checkTransporterOverloaded(signal, all, lqhKeyReq) ||
ERROR_INSERTED_CLEAR(5047)) {
jam();
+ releaseSections(handle);
noFreeRecordLab(signal, lqhKeyReq, ZTRANSPORTER_OVERLOADED_ERROR);
return;
}
@@ -3586,6 +3686,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
if (ERROR_INSERTED(5031)) {
CLEAR_ERROR_INSERT_VALUE;
}
+ releaseSections(handle);
noFreeRecordLab(signal, lqhKeyReq, ZNO_TC_CONNECT_ERROR);
return;
}//if
@@ -3593,6 +3694,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
if(ERROR_INSERTED(5038) &&
refToNode(signal->getSendersBlockRef()) != getOwnNodeId()){
jam();
+ releaseSections(handle);
SET_ERROR_INSERT_VALUE(5039);
return;
}
@@ -3604,8 +3706,15 @@ void Dblqh::execLQHKEYREQ(Signal* signal
regTcPtr->clientConnectrec = sig0;
regTcPtr->tcOprec = sig0;
regTcPtr->storedProcId = ZNIL;
+ regTcPtr->m_flags= 0;
+ bool isLongReq= false;
+ if (handle.m_cnt > 0)
+ {
+ isLongReq= true;
+ regTcPtr->m_flags|= TcConnectionrec::OP_ISLONGREQ;
+ }
- UintR TtotReclenAi = lqhKeyReq->attrLen;
+ UintR attrLenFlags = lqhKeyReq->attrLen;
sig1 = lqhKeyReq->savePointId;
sig2 = lqhKeyReq->hashValue;
UintR Treqinfo = lqhKeyReq->requestInfo;
@@ -3620,16 +3729,16 @@ void Dblqh::execLQHKEYREQ(Signal* signal
const Uint8 op = LqhKeyReq::getOperation(Treqinfo);
if ((op == ZREAD || op == ZREAD_EX) && !getAllowRead()){
+ releaseSections(handle);
noFreeRecordLab(signal, lqhKeyReq, ZNODE_SHUTDOWN_IN_PROGESS);
return;
}
Uint32 senderVersion = getNodeInfo(refToNode(senderRef)).m_version;
- regTcPtr->totReclenAi = LqhKeyReq::getAttrLen(TtotReclenAi);
regTcPtr->tcScanInfo = lqhKeyReq->scanInfo;
- regTcPtr->indTakeOver = LqhKeyReq::getScanTakeOverFlag(TtotReclenAi);
- regTcPtr->m_reorg = LqhKeyReq::getReorgFlag(TtotReclenAi);
+ regTcPtr->indTakeOver = LqhKeyReq::getScanTakeOverFlag(attrLenFlags);
+ regTcPtr->m_reorg = LqhKeyReq::getReorgFlag(attrLenFlags);
regTcPtr->readlenAi = 0;
regTcPtr->currTupAiLen = 0;
@@ -3674,6 +3783,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
if (markerPtr.i == RNIL)
{
noFreeRecordLab(signal, lqhKeyReq, ZNO_FREE_MARKER_RECORDS_ERROR);
+ releaseSections(handle);
return;
}
markerPtr.p->transid1 = sig1;
@@ -3698,7 +3808,6 @@ void Dblqh::execLQHKEYREQ(Signal* signal
regTcPtr->opExec = LqhKeyReq::getInterpretedFlag(Treqinfo);
regTcPtr->opSimple = LqhKeyReq::getSimpleFlag(Treqinfo);
regTcPtr->seqNoReplica = LqhKeyReq::getSeqNoReplica(Treqinfo);
- UintR TreclenAiLqhkey = LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
regTcPtr->apiVersionNo = 0;
regTcPtr->m_use_rowid = LqhKeyReq::getRowidFlag(Treqinfo);
regTcPtr->m_dealloc = 0;
@@ -3722,11 +3831,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
CRASH_INSERTION2(5041, (op == ZREAD &&
(regTcPtr->opSimple || regTcPtr->dirtyOp) &&
refToNode(signal->senderBlockRef()) != cownNodeid));
-
- regTcPtr->reclenAiLqhkey = TreclenAiLqhkey;
- regTcPtr->currReclenAi = TreclenAiLqhkey;
- UintR TitcKeyLen = LqhKeyReq::getKeyLen(Treqinfo);
- regTcPtr->primKeyLen = TitcKeyLen;
+
regTcPtr->noFiredTriggers = lqhKeyReq->noFiredTriggers;
UintR TapplAddressInd = LqhKeyReq::getApplicationAddressFlag(Treqinfo);
@@ -3743,7 +3848,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
regTcPtr->nodeAfterNext[1] = lqhKeyReq->variableData[nextPos] >> 16;
nextPos++;
}//if
- UintR TstoredProcIndicator = LqhKeyReq::getStoredProcFlag(TtotReclenAi);
+ UintR TstoredProcIndicator = LqhKeyReq::getStoredProcFlag(attrLenFlags);
if (TstoredProcIndicator == 1) {
regTcPtr->storedProcId = lqhKeyReq->variableData[nextPos] & ZNIL;
nextPos++;
@@ -3753,29 +3858,82 @@ void Dblqh::execLQHKEYREQ(Signal* signal
regTcPtr->readlenAi = lqhKeyReq->variableData[nextPos] & ZNIL;
nextPos++;
}//if
- sig0 = lqhKeyReq->variableData[nextPos + 0];
- sig1 = lqhKeyReq->variableData[nextPos + 1];
- sig2 = lqhKeyReq->variableData[nextPos + 2];
- sig3 = lqhKeyReq->variableData[nextPos + 3];
- regTcPtr->tupkeyData[0] = sig0;
- regTcPtr->tupkeyData[1] = sig1;
- regTcPtr->tupkeyData[2] = sig2;
- regTcPtr->tupkeyData[3] = sig3;
-
- if (TitcKeyLen > 0) {
- if (TitcKeyLen < 4) {
- nextPos += TitcKeyLen;
- } else {
- nextPos += 4;
- }//if
- }
- else if (! (LqhKeyReq::getNrCopyFlag(Treqinfo)))
+ UintR TitcKeyLen = 0;
+ Uint32 keyLenWithLQHReq = 0;
+ UintR TreclenAiLqhkey = 0;
+
+ if (isLongReq)
+ {
+ /* Long LQHKEYREQ indicates Key and AttrInfo presence and
+ * size via section lengths
+ */
+ SegmentedSectionPtr keyInfoSection, attrInfoSection;
+
+ handle.getSection(keyInfoSection,
+ LqhKeyReq::KeyInfoSectionNum);
+
+ ndbassert(keyInfoSection.i != RNIL);
+
+ regTcPtr->keyInfoIVal= keyInfoSection.i;
+ TitcKeyLen= keyInfoSection.sz;
+ keyLenWithLQHReq= TitcKeyLen;
+
+ Uint32 totalAttrInfoLen= 0;
+ if (handle.getSection(attrInfoSection,
+ LqhKeyReq::AttrInfoSectionNum))
+ {
+ regTcPtr->attrInfoIVal= attrInfoSection.i;
+ totalAttrInfoLen= attrInfoSection.sz;
+ }
+
+ regTcPtr->reclenAiLqhkey = 0;
+ regTcPtr->currReclenAi = totalAttrInfoLen;
+ regTcPtr->totReclenAi = totalAttrInfoLen;
+
+ /* Detach sections from the handle, we are now responsible
+ * for freeing them when appropriate
+ */
+ handle.clear();
+ }
+ else
+ {
+ /* Short LQHKEYREQ, Key and Attr sizes are in
+ * signal, along with some data
+ */
+ TreclenAiLqhkey= LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
+ regTcPtr->reclenAiLqhkey = TreclenAiLqhkey;
+ regTcPtr->currReclenAi = TreclenAiLqhkey;
+ TitcKeyLen = LqhKeyReq::getKeyLen(Treqinfo);
+ regTcPtr->totReclenAi = LqhKeyReq::getAttrLen(attrLenFlags);
+
+ /* Note key can be length zero for NR when Rowid used */
+ keyLenWithLQHReq= MIN(TitcKeyLen, LqhKeyReq::MaxKeyInfo);
+
+ bool ok= appendToSection(regTcPtr->keyInfoIVal,
+ &lqhKeyReq->variableData[ nextPos ],
+ keyLenWithLQHReq);
+ if (unlikely(!ok))
+ {
+ jam();
+ terrorCode= ZGET_DATAREC_ERROR;
+ abortErrorLab(signal);
+ return;
+ }
+
+ nextPos+= keyLenWithLQHReq;
+ }
+
+ regTcPtr->primKeyLen = TitcKeyLen;
+
+ /* Only node restart copy allowed to send no KeyInfo */
+ if (( keyLenWithLQHReq == 0 ) &&
+ (! (LqhKeyReq::getNrCopyFlag(Treqinfo))))
{
LQHKEY_error(signal, 3);
return;
}//if
-
+
sig0 = lqhKeyReq->variableData[nextPos + 0];
sig1 = lqhKeyReq->variableData[nextPos + 1];
regTcPtr->m_row_id.m_page_no = sig0;
@@ -3892,7 +4050,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
regTcPtr->replicaType = TcopyType;
regTcPtr->fragmentptr = fragptr.i;
regTcPtr->m_log_part_ptr_i = logPart;
- Uint8 TdistKey = LqhKeyReq::getDistributionKey(TtotReclenAi);
+ Uint8 TdistKey = LqhKeyReq::getDistributionKey(attrLenFlags);
if ((tfragDistKey != TdistKey) &&
(regTcPtr->seqNoReplica == 0) &&
(regTcPtr->dirtyOp == ZFALSE))
@@ -3909,81 +4067,122 @@ void Dblqh::execLQHKEYREQ(Signal* signal
tmp = (tmp < 0 ? - tmp : tmp);
if ((tmp <= 1) || (tfragDistKey == 0)) {
LQHKEY_abort(signal, 0);
- return;
}//if
LQHKEY_error(signal, 1);
+ return;
}//if
- if (TreclenAiLqhkey != 0) {
- if (regTcPtr->operation != ZREAD) {
- if (regTcPtr->operation != ZDELETE) {
- if (regTcPtr->opExec != 1) {
- jam();
-/*---------------------------------------------------------------------------*/
-/* */
-/* UPDATES, WRITES AND INSERTS THAT ARE NOT INTERPRETED WILL USE THE */
-/* SAME ATTRINFO IN ALL REPLICAS. THUS WE SAVE THE ATTRINFO ALREADY */
-/* TO SAVE A SIGNAL FROM TUP TO LQH. INTERPRETED EXECUTION IN TUP */
-/* WILL CREATE NEW ATTRINFO FOR THE OTHER REPLICAS AND IT IS THUS NOT */
-/* A GOOD IDEA TO SAVE THE INFORMATION HERE. READS WILL ALSO BE */
-/* UNNECESSARY TO SAVE SINCE THAT ATTRINFO WILL NEVER BE SENT TO ANY */
-/* MORE REPLICAS. */
-/*---------------------------------------------------------------------------*/
-/* READS AND DELETES CAN ONLY HAVE INFORMATION ABOUT WHAT IS TO BE READ. */
-/* NO INFORMATION THAT NEEDS LOGGING. */
-/*---------------------------------------------------------------------------*/
- sig0 = lqhKeyReq->variableData[nextPos + 0];
- sig1 = lqhKeyReq->variableData[nextPos + 1];
- sig2 = lqhKeyReq->variableData[nextPos + 2];
- sig3 = lqhKeyReq->variableData[nextPos + 3];
- sig4 = lqhKeyReq->variableData[nextPos + 4];
-
- regTcPtr->firstAttrinfo[0] = sig0;
- regTcPtr->firstAttrinfo[1] = sig1;
- regTcPtr->firstAttrinfo[2] = sig2;
- regTcPtr->firstAttrinfo[3] = sig3;
- regTcPtr->firstAttrinfo[4] = sig4;
- regTcPtr->currTupAiLen = TreclenAiLqhkey;
- } else {
- jam();
- regTcPtr->reclenAiLqhkey = 0;
- }//if
- } else {
+
+ /*
+ * Interpreted updates and deletes may require different AttrInfo in
+ * different replicas, as only the primary executes the interpreted
+ * program, and the effect of the program rather than the program
+ * should be logged.
+ * Non interpreted inserts, updates, writes and deletes use the same
+ * AttrInfo in all replicas.
+ * All reads only run on one replica, and are not logged.
+ * The AttrInfo section is passed to TUP attached to the TUPKEYREQ
+ * signal below.
+ *
+ * Normal processing :
+ * - LQH passes ATTRINFO section to TUP attached to direct TUPKEYREQ
+ * signal
+ * - TUP processes request and sends direct TUPKEYCONF back to LQH
+ * - LQH continues processing (logging, forwarding LQHKEYREQ to other
+ * replicas as necessary)
+ * - LQH frees ATTRINFO section
+ * Note that TUP is not responsible for freeing the passed ATTRINFO
+ * section, LQH is.
+ *
+ * Interpreted Update / Delete processing
+ * - LQH passes ATTRINFO section to TUP attached to direct TUPKEYREQ
+ * signal
+ * - TUP processes request, generating new ATTRINFO data
+ * - If new AttrInfo data is > 0 words, TUP sends it back to LQH as
+ * a long section attached to a single ATTRINFO signal.
+ * - LQH frees the original AttrInfo section and stores a ref to
+ * the new section
+ * - TUP sends direct TUPKEYCONF back to LQH with new ATTRINFO length
+ * - If the new ATTRINFO is > 0 words,
+ * - LQH continues processing with it (logging, forwarding
+ * LQHKEYREQ to other replicas as necessary)
+ * - LQH frees the new ATTRINFO section
+ * - If the new ATTRINFO is 0 words, LQH frees the original ATTRINFO
+ * section and continues processing (logging, forwarding LQHKEYREQ
+ * to other replicas as necessary)
+ *
+ */
+ bool attrInfoToPropagate=
+ (regTcPtr->totReclenAi != 0) &&
+ (regTcPtr->operation != ZREAD) &&
+ (regTcPtr->operation != ZDELETE);
+ bool tupCanChangePropagatedAttrInfo= (regTcPtr->opExec == 1);
+
+ bool saveAttrInfo=
+ attrInfoToPropagate &&
+ (! tupCanChangePropagatedAttrInfo);
+
+ if (saveAttrInfo)
+ regTcPtr->m_flags|= TcConnectionrec::OP_SAVEATTRINFO;
+
+
+ /* Handle any AttrInfo we received with the LQHKEYREQ */
+ if (regTcPtr->currReclenAi != 0)
+ {
+ jam();
+ if (isLongReq)
+ {
+ /* Long LQHKEYREQ */
+ jam();
+
+ regTcPtr->currTupAiLen= saveAttrInfo ?
+ regTcPtr->totReclenAi :
+ 0;
+ }
+ else
+ {
+ /* Short LQHKEYREQ */
+ jam();
+
+ /* Lets put the AttrInfo into a segmented section */
+ bool ok= appendToSection(regTcPtr->attrInfoIVal,
+ lqhKeyReq->variableData + nextPos,
+ TreclenAiLqhkey);
+ if (unlikely(!ok))
+ {
jam();
- regTcPtr->reclenAiLqhkey = 0;
- }//if
- }//if
- sig0 = lqhKeyReq->variableData[nextPos + 0];
- sig1 = lqhKeyReq->variableData[nextPos + 1];
- sig2 = lqhKeyReq->variableData[nextPos + 2];
- sig3 = lqhKeyReq->variableData[nextPos + 3];
- sig4 = lqhKeyReq->variableData[nextPos + 4];
-
- c_tup->receive_attrinfo(signal, regTcPtr->tupConnectrec,
- lqhKeyReq->variableData+nextPos, TreclenAiLqhkey);
-
- if (signal->theData[0] == (UintR)-1) {
- LQHKEY_abort(signal, 2);
- return;
- }//if
+ terrorCode= ZGET_DATAREC_ERROR;
+ abortErrorLab(signal);
+ return;
+ }
+
+ regTcPtr->currTupAiLen= TreclenAiLqhkey;
+ }
}//if
-/* ------- TAKE CARE OF PRIM KEY DATA ------- */
- if (regTcPtr->primKeyLen <= 4) {
+
+ /* If we've received all KeyInfo, proceed with processing,
+ * otherwise wait for discrete KeyInfo signals
+ */
+ if (regTcPtr->primKeyLen == keyLenWithLQHReq) {
endgettupkeyLab(signal);
return;
} else {
jam();
-/*--------------------------------------------------------------------*/
-/* KEY LENGTH WAS MORE THAN 4 WORDS (WORD = 4 BYTE). THUS WE */
-/* HAVE TO ALLOCATE A DATA BUFFER TO STORE THE KEY DATA AND */
-/* WAIT FOR THE KEYINFO SIGNAL. */
-/*--------------------------------------------------------------------*/
- regTcPtr->save1 = 4;
+ ndbassert(!isLongReq);
+ /* Wait for remaining KeyInfo */
+ regTcPtr->save1 = keyLenWithLQHReq;
regTcPtr->transactionState = TcConnectionrec::WAIT_TUPKEYINFO;
return;
}//if
return;
}//Dblqh::execLQHKEYREQ()
+
+
+/**
+ * endgettupkeyLab
+ * Invoked when all KeyInfo and/or all AttrInfo has been
+ * received
+ */
void Dblqh::endgettupkeyLab(Signal* signal)
{
TcConnectionrec * const regTcPtr = tcConnectptr.p;
@@ -3991,7 +4190,10 @@ void Dblqh::endgettupkeyLab(Signal* sign
;
} else {
jam();
+ /* Wait for discrete AttrInfo signals */
ndbrequire(regTcPtr->currReclenAi < regTcPtr->totReclenAi);
+ ndbassert( !(regTcPtr->m_flags &
+ TcConnectionrec::OP_ISLONGREQ) );
regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR;
return;
}//if
@@ -4119,7 +4321,7 @@ void Dblqh::prepareContinueAfterBlockedL
TRACENR(" NrCopy");
if (LqhKeyReq::getRowidFlag(regTcPtr->reqinfo))
TRACENR(" rowid: " << regTcPtr->m_row_id);
- TRACENR(" key: " << regTcPtr->tupkeyData[0]);
+ TRACENR(" key: " << getKeyInfoWordOrZero(regTcPtr, 0));
}
if (likely(activeCreat == Fragrecord::AC_NORMAL))
@@ -4180,21 +4382,13 @@ Dblqh::exec_acckeyreq(Signal* signal, Tc
signal->theData[5] = sig4;
sig0 = regTcPtr.p->transid[1];
- sig1 = regTcPtr.p->tupkeyData[0];
- sig2 = regTcPtr.p->tupkeyData[1];
- sig3 = regTcPtr.p->tupkeyData[2];
- sig4 = regTcPtr.p->tupkeyData[3];
signal->theData[6] = sig0;
- signal->theData[7] = sig1;
- signal->theData[8] = sig2;
- signal->theData[9] = sig3;
- signal->theData[10] = sig4;
+
+ /* Copy KeyInfo to end of ACCKEYREQ signal, starting at offset 7 */
+ sendKeyinfoAcc(signal, 7);
TRACE_OP(regTcPtr.p, "ACC");
- if (regTcPtr.p->primKeyLen > 4) {
- sendKeyinfoAcc(signal, 11);
- }//if
EXECUTE_DIRECT(refToBlock(regTcPtr.p->tcAccBlockref), GSN_ACCKEYREQ,
signal, 7 + regTcPtr.p->primKeyLen);
if (signal->theData[0] < RNIL) {
@@ -4266,7 +4460,7 @@ Dblqh::handle_nr_copy(Signal* signal, Pt
* Case 4
* Perform delete using rowid
* primKeyLen == 0
- * tupkeyData[0] == rowid
+ * key[0] == rowid
*/
jam();
ndbassert(regTcPtr.p->primKeyLen == 0);
@@ -4380,6 +4574,10 @@ update_gci_ignore:
packLqhkeyreqLab(signal);
}
+/**
+ * Compare received key data with the data supplied
+ * returning 0 if they are the same, 1 otherwise
+ */
int
Dblqh::compare_key(const TcConnectionrec* regTcPtr,
const Uint32 * ptr, Uint32 len)
@@ -4387,31 +4585,29 @@ Dblqh::compare_key(const TcConnectionrec
if (regTcPtr->primKeyLen != len)
return 1;
- if (len <= 4)
- return memcmp(ptr, regTcPtr->tupkeyData, 4*len);
-
- if (memcmp(ptr, regTcPtr->tupkeyData, sizeof(regTcPtr->tupkeyData)))
- return 1;
+ ndbassert( regTcPtr->keyInfoIVal != RNIL );
+
+ SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+ getSectionSegmentPool());
- len -= (sizeof(regTcPtr->tupkeyData) >> 2);
- ptr += (sizeof(regTcPtr->tupkeyData) >> 2);
+ ndbassert(regTcPtr->primKeyLen == keyInfoReader.getSize());
- DatabufPtr regDatabufptr;
- regDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- while(len > 4)
+ while (len != 0)
{
- if (memcmp(ptr, regDatabufptr.p, 4*4))
- return 1;
+ const Uint32* keyChunk= NULL;
+ Uint32 chunkSize= 0;
- ptr += 4;
- len -= 4;
- regDatabufptr.i = regDatabufptr.p->nextDatabuf;
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- }
+ /* Get a ptr to a chunk of contiguous words to compare */
+ bool ok= keyInfoReader.getWordsPtr(len, keyChunk, chunkSize);
- if (memcmp(ptr, regDatabufptr.p, 4*len))
- return 1;
+ ndbrequire(ok);
+
+ if ( memcmp(ptr, keyChunk, chunkSize << 2))
+ return 1;
+
+ ptr+= chunkSize;
+ len-= chunkSize;
+ }
return 0;
}
@@ -4454,12 +4650,11 @@ Dblqh::nr_copy_delete_row(Signal* signal
keylen = regTcPtr.p->primKeyLen;
signal->theData[3] = regTcPtr.p->hashValue;
signal->theData[4] = keylen;
- signal->theData[7] = regTcPtr.p->tupkeyData[0];
- signal->theData[8] = regTcPtr.p->tupkeyData[1];
- signal->theData[9] = regTcPtr.p->tupkeyData[2];
- signal->theData[10] = regTcPtr.p->tupkeyData[3];
- if (keylen > 4)
- sendKeyinfoAcc(signal, 11);
+
+ /* Copy KeyInfo inline into the ACCKEYREQ signal,
+ * starting at word 7
+ */
+ sendKeyinfoAcc(signal, 7);
}
const Uint32 ref = refToBlock(regTcPtr.p->tcAccBlockref);
EXECUTE_DIRECT(ref, GSN_ACCKEYREQ, signal, 7 + keylen);
@@ -4637,19 +4832,7 @@ Dblqh::readPrimaryKeys(Uint32 opPtrI, Ui
regDatabufptr.i = regTcPtr.p->firstTupkeybuf;
Uint32 * tmp = xfrm ? (Uint32*)Tmp : dst;
- memcpy(tmp, regTcPtr.p->tupkeyData, sizeof(regTcPtr.p->tupkeyData));
- if (keyLen > 4)
- {
- tmp += 4;
- Uint32 pos = 4;
- do {
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- memcpy(tmp, regDatabufptr.p->data, sizeof(regDatabufptr.p->data));
- regDatabufptr.i = regDatabufptr.p->nextDatabuf;
- tmp += sizeof(regDatabufptr.p->data) >> 2;
- pos += sizeof(regDatabufptr.p->data) >> 2;
- } while(pos < keyLen);
- }
+ copy(tmp, regTcPtr.p->keyInfoIVal);
if (xfrm)
{
@@ -4661,29 +4844,42 @@ Dblqh::readPrimaryKeys(Uint32 opPtrI, Ui
return keyLen;
}
+/**
+ * getKeyInfoWordOrZero
+ * Get given word of KeyInfo, or zero if it's not available
+ * Used for tracing
+ */
+Uint32
+Dblqh::getKeyInfoWordOrZero(const TcConnectionrec* regTcPtr,
+ Uint32 offset)
+{
+ if (regTcPtr->keyInfoIVal != RNIL)
+ {
+ SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+ g_sectionSegmentPool);
+
+ if (keyInfoReader.getSize() > offset)
+ {
+ if (offset)
+ keyInfoReader.step(offset);
+
+ Uint32 word;
+ keyInfoReader.getWord(&word);
+ return word;
+ }
+ }
+ return 0;
+}
+
/* =*======================================================================= */
/* ======= SEND KEYINFO TO ACC ======= */
/* */
/* ========================================================================= */
void Dblqh::sendKeyinfoAcc(Signal* signal, Uint32 Ti)
{
- DatabufPtr regDatabufptr;
- regDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
-
- do {
- jam();
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- Uint32 sig0 = regDatabufptr.p->data[0];
- Uint32 sig1 = regDatabufptr.p->data[1];
- Uint32 sig2 = regDatabufptr.p->data[2];
- Uint32 sig3 = regDatabufptr.p->data[3];
- signal->theData[Ti] = sig0;
- signal->theData[Ti + 1] = sig1;
- signal->theData[Ti + 2] = sig2;
- signal->theData[Ti + 3] = sig3;
- regDatabufptr.i = regDatabufptr.p->nextDatabuf;
- Ti += 4;
- } while (regDatabufptr.i != RNIL);
+ /* Copy all KeyInfo into the signal at offset Ti */
+ copy(&signal->theData[Ti],
+ tcConnectptr.p->keyInfoIVal);
}//Dblqh::sendKeyinfoAcc()
void Dblqh::execLQH_ALLOCREQ(Signal* signal)
@@ -4895,6 +5091,18 @@ Dblqh::acckeyconf_tupkeyreq(Signal* sign
regTcPtr->m_row_id.m_page_no = page_no;
regTcPtr->m_row_id.m_page_idx = page_idx;
+ tupKeyReq->attrInfoIVal= RNIL;
+
+ /* Pass AttrInfo section if available in the TupKeyReq signal
+ * We are still responsible for releasing it, TUP is just
+ * borrowing it
+ */
+ if (tupKeyReq->attrBufLen > 0)
+ {
+ ndbassert( regTcPtr->attrInfoIVal != RNIL );
+ tupKeyReq->attrInfoIVal= regTcPtr->attrInfoIVal;
+ }
+
EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
}//Dblqh::execACCKEYCONF()
@@ -5013,7 +5221,12 @@ void Dblqh::tupkeyConfLab(Signal* signal
commitContinueAfterBlockedLab(signal);
return;
}//if
+
regTcPtr->totSendlenAi = writeLen;
+ /* We will propagate / log writeLen words
+ * Check that that is how many we have available to
+ * propagate
+ */
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
@@ -5446,13 +5659,26 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
Uint32 nextNodeId = regTcPtr->nextReplica;
Uint32 nextVersion = getNodeInfo(nextNodeId).m_version;
- UintR TAiLen = regTcPtr->reclenAiLqhkey;
+
+ /* Send long LqhKeyReq to next replica if it can support it */
+ bool sendLongReq= ! ((nextVersion < NDBD_LONG_LQHKEYREQ) ||
+ ERROR_INSERTED(5051));
+
+ UintR TAiLen = sendLongReq ?
+ 0 :
+ MIN(regTcPtr->totSendlenAi, LqhKeyReq::MaxAttrInfo);
+
+ /* Long LQHKeyReq uses section size for key length */
+ Uint32 lqhKeyLen= sendLongReq?
+ 0 :
+ regTcPtr->primKeyLen;
UintR TapplAddressIndicator = (regTcPtr->nextSeqNoReplica == 0 ? 0 : 1);
LqhKeyReq::setApplicationAddressFlag(Treqinfo, TapplAddressIndicator);
LqhKeyReq::setInterpretedFlag(Treqinfo, regTcPtr->opExec);
LqhKeyReq::setSeqNoReplica(Treqinfo, regTcPtr->nextSeqNoReplica);
LqhKeyReq::setAIInLqhKeyReq(Treqinfo, TAiLen);
+ LqhKeyReq::setKeyLen(Treqinfo,lqhKeyLen);
if (unlikely(nextVersion < NDBD_ROWID_VERSION))
{
@@ -5481,7 +5707,11 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
LqhKeyReq::setSameClientAndTcFlag(Treqinfo, TsameLqhAndClient);
LqhKeyReq::setReturnedReadLenAIFlag(Treqinfo, TreadLenAiInd);
- UintR TotReclenAi = regTcPtr->totSendlenAi;
+ /* Long LQHKeyReq uses section size for AttrInfo length */
+ UintR TotReclenAi = sendLongReq ?
+ 0 :
+ regTcPtr->totSendlenAi;
+
LqhKeyReq::setReorgFlag(TotReclenAi, regTcPtr->m_reorg);
/* ------------------------------------------------------------------------- */
@@ -5530,23 +5760,33 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
nextPos++;
}//if
sig0 = regTcPtr->readlenAi;
- sig1 = regTcPtr->tupkeyData[0];
- sig2 = regTcPtr->tupkeyData[1];
- sig3 = regTcPtr->tupkeyData[2];
- sig4 = regTcPtr->tupkeyData[3];
-
lqhKeyReq->variableData[nextPos] = sig0;
nextPos += TreadLenAiInd;
- lqhKeyReq->variableData[nextPos] = sig1;
- lqhKeyReq->variableData[nextPos + 1] = sig2;
- lqhKeyReq->variableData[nextPos + 2] = sig3;
- lqhKeyReq->variableData[nextPos + 3] = sig4;
- UintR TkeyLen = LqhKeyReq::getKeyLen(Treqinfo);
- if (TkeyLen < 4) {
- nextPos += TkeyLen;
- } else {
- nextPos += 4;
- }//if
+
+ if (!sendLongReq)
+ {
+ /* Short LQHKEYREQ to older LQH
+ * First few words of KeyInfo go into LQHKEYREQ
+ * Sometimes have no Keyinfo
+ */
+ if (regTcPtr->primKeyLen != 0)
+ {
+ SegmentedSectionPtr keyInfoSection;
+
+ ndbassert(regTcPtr->keyInfoIVal != RNIL);
+
+ getSection(keyInfoSection, regTcPtr->keyInfoIVal);
+ SectionReader keyInfoReader(keyInfoSection, g_sectionSegmentPool);
+
+ UintR keyLenInLqhKeyReq= MIN(LqhKeyReq::MaxKeyInfo,
+ regTcPtr->primKeyLen);
+
+ keyInfoReader.getWords(&lqhKeyReq->variableData[nextPos],
+ keyLenInLqhKeyReq);
+
+ nextPos+= keyLenInLqhKeyReq;
+ }
+ }
sig0 = regTcPtr->gci_hi;
Local_key tmp = regTcPtr->m_row_id;
@@ -5560,77 +5800,147 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
BlockReference lqhRef = calcLqhBlockRef(regTcPtr->nextReplica);
- if (likely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength <= 25))
+ if (likely(sendLongReq))
{
- jam();
- sig0 = regTcPtr->firstAttrinfo[0];
- sig1 = regTcPtr->firstAttrinfo[1];
- sig2 = regTcPtr->firstAttrinfo[2];
- sig3 = regTcPtr->firstAttrinfo[3];
- sig4 = regTcPtr->firstAttrinfo[4];
+ /* Long LQHKEYREQ, attach KeyInfo and AttrInfo
+ * sections to signal
+ */
+ SectionHandle handle(this);
+ handle.m_cnt= 0;
- lqhKeyReq->variableData[nextPos] = sig0;
- lqhKeyReq->variableData[nextPos + 1] = sig1;
- lqhKeyReq->variableData[nextPos + 2] = sig2;
- lqhKeyReq->variableData[nextPos + 3] = sig3;
- lqhKeyReq->variableData[nextPos + 4] = sig4;
-
- nextPos += TAiLen;
- TAiLen = 0;
+ if (regTcPtr->primKeyLen > 0)
+ {
+ SegmentedSectionPtr keyInfoSection;
+
+ ndbassert(regTcPtr->keyInfoIVal != RNIL);
+ getSection(keyInfoSection, regTcPtr->keyInfoIVal);
+
+ handle.m_ptr[ LqhKeyReq::KeyInfoSectionNum ]= keyInfoSection;
+ handle.m_cnt= 1;
+
+ if (regTcPtr->totSendlenAi > 0)
+ {
+ SegmentedSectionPtr attrInfoSection;
+
+ ndbassert(regTcPtr->attrInfoIVal != RNIL);
+ getSection(attrInfoSection, regTcPtr->attrInfoIVal);
+
+ handle.m_ptr[ LqhKeyReq::AttrInfoSectionNum ]= attrInfoSection;
+ handle.m_cnt= 2;
+ }
+ else
+ {
+ /* No AttrInfo to be sent on. This can occur for delete
+ * or with an interpreted update when no actual update
+ * is made
+ * In this case, we free any attrInfo section now.
+ */
+ if (regTcPtr->attrInfoIVal != RNIL)
+ {
+ ndbassert(!( regTcPtr->m_flags &
+ TcConnectionrec::OP_SAVEATTRINFO))
+ releaseSection(regTcPtr->attrInfoIVal);
+ regTcPtr->attrInfoIVal= RNIL;
+ }
+ }
+ }
+ else
+ {
+ /* Zero-length primary key, better not have any
+ * AttrInfo
+ */
+ ndbrequire(regTcPtr->totSendlenAi == 0);
+ ndbassert(regTcPtr->keyInfoIVal == RNIL);
+ ndbassert(regTcPtr->attrInfoIVal == RNIL);
+ }
+
+ sendSignal(lqhRef, GSN_LQHKEYREQ, signal,
+ LqhKeyReq::FixedSignalLength + nextPos,
+ JBB,
+ &handle);
+
+ /* Long sections were freed as part of sendSignal */
+ ndbassert( handle.m_cnt == 0);
+ regTcPtr->keyInfoIVal= RNIL;
+ regTcPtr->attrInfoIVal= RNIL;
}
else
{
- Treqinfo &= ~(Uint32)(RI_AI_IN_THIS_MASK << RI_AI_IN_THIS_SHIFT);
- lqhKeyReq->requestInfo = Treqinfo;
- }
-
- sendSignal(lqhRef, GSN_LQHKEYREQ, signal,
- nextPos + LqhKeyReq::FixedSignalLength, JBB);
- if (regTcPtr->primKeyLen > 4) {
- jam();
-/* ------------------------------------------------------------------------- */
-/* MORE THAN 4 WORDS OF KEY DATA IS IN THE OPERATION. THEREFORE WE NEED TO */
-/* PREPARE A KEYINFO SIGNAL. MORE THAN ONE KEYINFO SIGNAL CAN BE SENT. */
-/* ------------------------------------------------------------------------- */
- sendTupkey(signal);
- }//if
-/* ------------------------------------------------------------------------- */
-/* NOW I AM PREPARED TO SEND ALL THE ATTRINFO SIGNALS. AT THE MOMENT A LOOP */
-/* SENDS ALL AT ONCE. LATER WE HAVE TO ADDRESS THE PROBLEM THAT THESE COULD */
-/* LEAD TO BUFFER EXPLOSION => NODE CRASH. */
-/* ------------------------------------------------------------------------- */
-/* NEW CODE TO SEND ATTRINFO IN PACK_LQHKEYREQ */
-/* THIS CODE USES A REAL-TIME BREAK AFTER */
-/* SENDING 16 SIGNALS. */
-/* -------------------------------------------------- */
- sig0 = regTcPtr->tcOprec;
- sig1 = regTcPtr->transid[0];
- sig2 = regTcPtr->transid[1];
- signal->theData[0] = sig0;
- signal->theData[1] = sig1;
- signal->theData[2] = sig2;
-
- if (unlikely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength > 25))
- {
- jam();
- /**
- * 4 replicas...
+ /* Short LQHKEYREQ to older LQH
+ * First few words of ATTRINFO go into LQHKEYREQ
+ * (if they fit)
*/
- memcpy(signal->theData+3, regTcPtr->firstAttrinfo, TAiLen << 2);
- sendSignal(lqhRef, GSN_ATTRINFO, signal, 3 + TAiLen, JBB);
+ if (TAiLen > 0)
+ {
+ if (likely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength <= 25))
+ {
+ jam();
+ SegmentedSectionPtr attrInfoSection;
+
+ ndbassert(regTcPtr->attrInfoIVal != RNIL);
+
+ getSection(attrInfoSection, regTcPtr->attrInfoIVal);
+ SectionReader attrInfoReader(attrInfoSection, getSectionSegmentPool());
+
+ attrInfoReader.getWords(&lqhKeyReq->variableData[nextPos],
+ TAiLen);
+
+ nextPos+= TAiLen;
+ }
+ else
+ {
+ /* Not enough space in LQHKEYREQ, we'll send everything in
+ * separate ATTRINFO signals
+ */
+ Treqinfo &= ~(Uint32)(RI_AI_IN_THIS_MASK << RI_AI_IN_THIS_SHIFT);
+ lqhKeyReq->requestInfo = Treqinfo;
+ TAiLen= 0;
+ }
+ }
+
+ sendSignal(lqhRef, GSN_LQHKEYREQ, signal,
+ nextPos + LqhKeyReq::FixedSignalLength, JBB);
+
+ /* Send extra KeyInfo signals if necessary... */
+ if (regTcPtr->primKeyLen > LqhKeyReq::MaxKeyInfo) {
+ jam();
+ sendTupkey(signal);
+ }//if
+
+ /* Send extra AttrInfo signals if necessary... */
+ Uint32 remainingAiLen= regTcPtr->totSendlenAi - TAiLen;
+
+ if (remainingAiLen != 0)
+ {
+ sig0 = regTcPtr->tcOprec;
+ sig1 = regTcPtr->transid[0];
+ sig2 = regTcPtr->transid[1];
+ signal->theData[0] = sig0;
+ signal->theData[1] = sig1;
+ signal->theData[2] = sig2;
+
+ SectionReader attrInfoReader(regTcPtr->attrInfoIVal,
+ g_sectionSegmentPool);
+
+ ndbassert(attrInfoReader.getSize() == regTcPtr->totSendlenAi);
+
+ /* Step over words already sent in LQHKEYREQ above */
+ attrInfoReader.step(TAiLen);
+
+ while (remainingAiLen != 0)
+ {
+ Uint32 dataInSignal= MIN(AttrInfo::DataLength, remainingAiLen);
+ attrInfoReader.getWords(&signal->theData[3],
+ dataInSignal);
+ remainingAiLen-= dataInSignal;
+ sendSignal(lqhRef, GSN_ATTRINFO, signal,
+ AttrInfo::HeaderLength + dataInSignal, JBB);
+ }
+ }
}
- AttrbufPtr regAttrinbufptr;
- regAttrinbufptr.i = regTcPtr->firstAttrinbuf;
- while (regAttrinbufptr.i != RNIL) {
- ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
- jam();
- Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
- ndbrequire(dataLen != 0);
- MEMCOPY_NO_WORDS(&signal->theData[3], ®Attrinbufptr.p->attrbuf[0], dataLen);
- regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
- sendSignal(lqhRef, GSN_ATTRINFO, signal, dataLen + 3, JBB);
- }//while
+ /* LQHKEYREQ sent */
+
regTcPtr->transactionState = TcConnectionrec::PREPARED;
if (regTcPtr->dirtyOp == ZTRUE) {
jam();
@@ -5758,49 +6068,24 @@ void Dblqh::writeLogHeader(Signal* signa
void Dblqh::writeKey(Signal* signal)
{
TcConnectionrec * const regTcPtr = tcConnectptr.p;
- Uint32 logPos, endPos, dataLen;
- Int32 remainingLen;
- logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- remainingLen = regTcPtr->primKeyLen;
- dataLen = remainingLen;
- if (remainingLen > 4)
- dataLen = 4;
- remainingLen -= dataLen;
- endPos = logPos + dataLen;
- if (endPos < ZPAGE_SIZE) {
- MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
- ®TcPtr->tupkeyData[0],
- dataLen);
- } else {
- jam();
- for (Uint32 i = 0; i < dataLen; i++)
- writeLogWord(signal, regTcPtr->tupkeyData[i]);
- endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- }//if
- DatabufPtr regDatabufptr;
- regDatabufptr.i = regTcPtr->firstTupkeybuf;
- while (remainingLen > 0) {
- logPos = endPos;
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- dataLen = remainingLen;
- if (remainingLen > 4)
- dataLen = 4;
- remainingLen -= dataLen;
- endPos += dataLen;
- if (endPos < ZPAGE_SIZE) {
- MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
- ®Databufptr.p->data[0],
- dataLen);
- } else {
- logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos;
- for (Uint32 i = 0; i < dataLen; i++)
- writeLogWord(signal, regDatabufptr.p->data[i]);
- endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- }//if
- regDatabufptr.i = regDatabufptr.p->nextDatabuf;
- }//while
- logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = endPos;
- ndbrequire(regDatabufptr.i == RNIL);
+ jam();
+ SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+ g_sectionSegmentPool);
+ const Uint32* srcPtr;
+ Uint32 length;
+ Uint32 wordsWritten= 0;
+
+ /* Write contiguous chunks of words from the KeyInfo
+ * section to the log
+ */
+ while (keyInfoReader.getWordsPtr(srcPtr,
+ length))
+ {
+ writeLogWords(signal, srcPtr, length);
+ wordsWritten+= length;
+ }
+
+ ndbassert( wordsWritten == regTcPtr->primKeyLen );
}//Dblqh::writeKey()
/* --------------------------------------------------------------------------
@@ -5814,44 +6099,26 @@ void Dblqh::writeAttrinfoLab(Signal* sig
Uint32 totLen = regTcPtr->currTupAiLen;
if (totLen == 0)
return;
- Uint32 logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- Uint32 lqhLen = regTcPtr->reclenAiLqhkey;
- ndbrequire(totLen >= lqhLen);
- Uint32 endPos = logPos + lqhLen;
- totLen -= lqhLen;
- if (endPos < ZPAGE_SIZE) {
- MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
- ®TcPtr->firstAttrinfo[0],
- lqhLen);
- } else {
- for (Uint32 i = 0; i < lqhLen; i++)
- writeLogWord(signal, regTcPtr->firstAttrinfo[i]);
- endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- }//if
- AttrbufPtr regAttrinbufptr;
- regAttrinbufptr.i = regTcPtr->firstAttrinbuf;
- while (totLen > 0) {
- logPos = endPos;
- ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
- Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
- ndbrequire(totLen >= dataLen);
- ndbrequire(dataLen > 0);
- totLen -= dataLen;
- endPos += dataLen;
- if (endPos < ZPAGE_SIZE) {
- MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
- ®Attrinbufptr.p->attrbuf[0],
- dataLen);
- } else {
- logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos;
- for (Uint32 i = 0; i < dataLen; i++)
- writeLogWord(signal, regAttrinbufptr.p->attrbuf[i]);
- endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- }//if
- regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
- }//while
- logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = endPos;
- ndbrequire(regAttrinbufptr.i == RNIL);
+
+ jam();
+ ndbassert( regTcPtr->attrInfoIVal != RNIL );
+ SectionReader attrInfoReader(regTcPtr->attrInfoIVal,
+ g_sectionSegmentPool);
+ const Uint32* srcPtr;
+ Uint32 length;
+ Uint32 wordsWritten= 0;
+
+ /* Write contiguous chunks of words from the
+ * AttrInfo section to the log
+ */
+ while (attrInfoReader.getWordsPtr(srcPtr,
+ length))
+ {
+ writeLogWords(signal, srcPtr, length);
+ wordsWritten+= length;
+ }
+
+ ndbassert( wordsWritten == totLen );
}//Dblqh::writeAttrinfoLab()
/* ------------------------------------------------------------------------- */
@@ -5861,31 +6128,31 @@ void Dblqh::writeAttrinfoLab(Signal* sig
/* ------------------------------------------------------------------------- */
void Dblqh::sendTupkey(Signal* signal)
{
- UintR TdataPos = 3;
BlockReference lqhRef = calcLqhBlockRef(tcConnectptr.p->nextReplica);
signal->theData[0] = tcConnectptr.p->tcOprec;
signal->theData[1] = tcConnectptr.p->transid[0];
signal->theData[2] = tcConnectptr.p->transid[1];
- databufptr.i = tcConnectptr.p->firstTupkeybuf;
- do {
- ptrCheckGuard(databufptr, cdatabufFileSize, databuf);
- signal->theData[TdataPos] = databufptr.p->data[0];
- signal->theData[TdataPos + 1] = databufptr.p->data[1];
- signal->theData[TdataPos + 2] = databufptr.p->data[2];
- signal->theData[TdataPos + 3] = databufptr.p->data[3];
- databufptr.i = databufptr.p->nextDatabuf;
- TdataPos += 4;
- if (databufptr.i == RNIL) {
- jam();
- sendSignal(lqhRef, GSN_KEYINFO, signal, TdataPos, JBB);
- return;
- } else if (TdataPos == 23) {
- jam();
- sendSignal(lqhRef, GSN_KEYINFO, signal, 23, JBB);
- TdataPos = 3;
- }
- } while (1);
+ Uint32 remainingLen= tcConnectptr.p->primKeyLen -
+ LqhKeyReq::MaxKeyInfo;
+
+ SectionReader keyInfoReader(tcConnectptr.p->keyInfoIVal,
+ g_sectionSegmentPool);
+
+ ndbassert(keyInfoReader.getSize() > LqhKeyReq::MaxKeyInfo);
+
+ /* Step over the words already sent in LQHKEYREQ */
+ keyInfoReader.step(LqhKeyReq::MaxKeyInfo);
+
+ while (remainingLen != 0)
+ {
+ Uint32 dataInSignal= MIN(KeyInfo::DataLength, remainingLen);
+ keyInfoReader.getWords(&signal->theData[3],
+ dataInSignal);
+ remainingLen-= dataInSignal;
+ sendSignal(lqhRef, GSN_KEYINFO, signal,
+ KeyInfo::HeaderLength + dataInSignal, JBB);
+ }
}//Dblqh::sendTupkey()
void Dblqh::cleanUp(Signal* signal)
@@ -5935,6 +6202,12 @@ void Dblqh::releaseOprec(Signal* signal)
regTcPtr->firstTupkeybuf = RNIL;
regTcPtr->lastTupkeybuf = RNIL;
+ /* Release long sections if present */
+ releaseSection(regTcPtr->keyInfoIVal);
+ regTcPtr->keyInfoIVal = RNIL;
+ releaseSection(regTcPtr->attrInfoIVal);
+ regTcPtr->attrInfoIVal = RNIL;
+
if (regTcPtr->m_dealloc)
{
jam();
@@ -6689,7 +6962,7 @@ void Dblqh::commitContinueAfterBlockedLa
TRACENR(" NrCopy");
if (LqhKeyReq::getRowidFlag(regTcPtr.p->reqinfo))
TRACENR(" rowid: " << regTcPtr.p->m_row_id);
- TRACENR(" key: " << regTcPtr.p->tupkeyData[0]);
+ TRACENR(" key: " << getKeyInfoWordOrZero(regTcPtr.p, 0));
TRACENR(endl);
}
@@ -7183,7 +7456,7 @@ void Dblqh::execACCKEYREF(Signal* signal
TRACENR(" NrCopy");
if (LqhKeyReq::getRowidFlag(tcPtr->reqinfo))
TRACENR(" rowid: " << tcPtr->m_row_id);
- TRACENR(" key: " << tcPtr->tupkeyData[0]);
+ TRACENR(" key: " << getKeyInfoWordOrZero(tcPtr, 0));
TRACENR(endl);
}
@@ -9513,27 +9786,29 @@ Dblqh::next_scanconf_tupkeyreq(Signal* s
tupKeyReq->tcOpIndex = regTcPtr->tcOprec;
tupKeyReq->savePointId = regTcPtr->savePointId;
tupKeyReq->disk_page= disk_page;
+ tupKeyReq->attrInfoIVal= RNIL;
Uint32 blockNo = refToBlock(regTcPtr->tcTupBlockref);
EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal,
TupKeyReq::SignalLength);
}
/* -------------------------------------------------------------------------
- * RECEPTION OF FURTHER KEY INFORMATION WHEN KEY SIZE > 16 BYTES.
+ * STORE KEYINFO IN A LONG SECTION PRIOR TO SENDING
* -------------------------------------------------------------------------
* PRECONDITION: SCAN_STATE = WAIT_SCAN_KEYINFO
* ------------------------------------------------------------------------- */
-void
-Dblqh::keyinfoLab(const Uint32 * src, const Uint32 * end)
+bool
+Dblqh::keyinfoLab(const Uint32 * src, Uint32 len)
{
- do {
- jam();
- seizeTupkeybuf(0);
- databufptr.p->data[0] = * src ++;
- databufptr.p->data[1] = * src ++;
- databufptr.p->data[2] = * src ++;
- databufptr.p->data[3] = * src ++;
- } while (src < end);
+ ndbassert( tcConnectptr.p->keyInfoIVal == RNIL );
+ ndbassert( len > 0 );
+
+ if (ERROR_INSERTED(5052))
+ return false;
+
+ return(appendToSection(tcConnectptr.p->keyInfoIVal,
+ src,
+ len));
}//Dblqh::keyinfoLab()
Uint32
@@ -10754,7 +11029,7 @@ void Dblqh::nextScanConfCopyLab(Signal*
scanptr.p->m_curr_batch_size_rows++;
- if (signal->getLength() == 7)
+ if (signal->getLength() == NextScanConf::SignalLengthNoKeyInfo)
{
jam();
ndbrequire(nextScanConf->accOperationPtr == RNIL);
@@ -10843,27 +11118,22 @@ void Dblqh::nextScanConfCopyLab(Signal*
void Dblqh::execTRANSID_AI(Signal* signal)
{
jamEntry();
+ /* TransID_AI received from local TUP, data is linear inline in
+ * signal buff
+ */
tcConnectptr.i = signal->theData[0];
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- Uint32 length = signal->length() - 3;
+ Uint32 length = signal->length() - TransIdAI::HeaderLength;
ndbrequire(tcConnectptr.p->transactionState == TcConnectionrec::COPY_TUPKEY);
- Uint32 * src = &signal->theData[3];
- while(length > 22){
- if (saveTupattrbuf(signal, src, 22) == ZOK) {
- ;
- } else {
- jam();
- tcConnectptr.p->errorCode = ZGET_ATTRINBUF_ERROR;
- return;
- }//if
- src += 22;
- length -= 22;
- }
- if (saveTupattrbuf(signal, src, length) == ZOK) {
- return;
+ Uint32 * src = &signal->theData[ TransIdAI::HeaderLength ];
+ bool ok= appendToSection(tcConnectptr.p->attrInfoIVal,
+ src,
+ length);
+ if (unlikely(! ok))
+ {
+ jam();
+ tcConnectptr.p->errorCode = ZGET_ATTRINBUF_ERROR;
}
- jam();
- tcConnectptr.p->errorCode = ZGET_ATTRINBUF_ERROR;
}//Dblqh::execTRANSID_AI()
/*--------------------------------------------------------------------------*/
@@ -10907,13 +11177,15 @@ void Dblqh::copyTupkeyConfLab(Signal* si
tcConnectptr.p->totSendlenAi = readLength;
tcConnectptr.p->connectState = TcConnectionrec::COPY_CONNECTED;
- // Read primary keys (used to get here via scan keyinfo)
+ /* Read primary keys from TUP into signal buffer space
+ * (used to get here via scan keyinfo)
+ */
Uint32* tmp = signal->getDataPtrSend()+24;
Uint32 len= tcConnectptr.p->primKeyLen = readPrimaryKeys(scanP, tcConP, tmp);
tcConP->gci_hi = tmp[len];
tcConP->gci_lo = 0;
- // Calculate hash (no need to linearies key)
+ // Calculate hash (no need to linearise key)
if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
{
tcConnectptr.p->hashValue = calculateHash(tableId, tmp);
@@ -10923,10 +11195,21 @@ void Dblqh::copyTupkeyConfLab(Signal* si
tcConnectptr.p->hashValue = md5_hash((Uint64*)tmp, len);
}
- // Move into databuffer to make packLqhkeyreqLab happy
- memcpy(tcConP->tupkeyData, tmp, 4*4);
- if(len > 4)
- keyinfoLab(tmp+4, tmp + len);
+ // Copy keyinfo into long section for LQHKEYREQ below
+ if (unlikely(!keyinfoLab(tmp, len)))
+ {
+ /* Failed to store keyInfo, fail copy
+ * This will result in a COPY_FRAGREF being sent to
+ * the starting node, which will cause it to fail
+ */
+ scanptr.p->scanErrorCounter++;
+ tcConP->errorCode= ZGET_DATAREC_ERROR;
+ scanptr.p->scanCompletedStatus= ZTRUE;
+
+ closeCopyLab(signal);
+ return;
+ }
+
LqhKeyReq::setKeyLen(tcConP->reqinfo, len);
/*---------------------------------------------------------------------------*/
@@ -16571,7 +16854,7 @@ void Dblqh::aiStateErrorCheckLab(Signal*
/* ACTIVE CREATION TO FALSE. THIS WILL ENSURE THAT THE ABORT IS */
/* COMPLETED. */
/*************************************************************************>*/
- if (saveTupattrbuf(signal, dataPtr, length) == ZOK) {
+ if (saveAttrInfoInSection(dataPtr, length) == ZOK) {
jam();
if (tcConnectptr.p->transactionState ==
TcConnectionrec::WAIT_AI_AFTER_ABORT) {
@@ -17541,6 +17824,9 @@ void Dblqh::initialiseTcrec(Signal* sign
tcConnectptr.p->lastAttrinbuf = RNIL;
tcConnectptr.p->firstTupkeybuf = RNIL;
tcConnectptr.p->lastTupkeybuf = RNIL;
+ tcConnectptr.p->keyInfoIVal = RNIL;
+ tcConnectptr.p->attrInfoIVal = RNIL;
+ tcConnectptr.p->m_flags= 0;
tcConnectptr.p->tcTimer = 0;
tcConnectptr.p->nextTcConnectrec = tcConnectptr.i + 1;
}//for
@@ -18007,27 +18293,13 @@ MPR_LOOP:
void Dblqh::readAttrinfo(Signal* signal)
{
Uint32 remainingLen = tcConnectptr.p->totSendlenAi;
+ tcConnectptr.p->reclenAiLqhkey = 0;
if (remainingLen == 0) {
jam();
- tcConnectptr.p->reclenAiLqhkey = 0;
return;
}//if
- Uint32 dataLen = remainingLen;
- if (remainingLen > 5)
- dataLen = 5;
- readLogData(signal, dataLen, &tcConnectptr.p->firstAttrinfo[0]);
- tcConnectptr.p->reclenAiLqhkey = dataLen;
- remainingLen -= dataLen;
- while (remainingLen > 0) {
- jam();
- dataLen = remainingLen;
- if (remainingLen > 22)
- dataLen = 22;
- seizeAttrinbuf(signal);
- readLogData(signal, dataLen, &attrinbufptr.p->attrbuf[0]);
- attrinbufptr.p->attrbuf[ZINBUF_DATA_LEN] = dataLen;
- remainingLen -= dataLen;
- }//while
+
+ readLogData(signal, remainingLen, tcConnectptr.p->attrInfoIVal);
}//Dblqh::readAttrinfo()
/* ------------------------------------------------------------------------- */
@@ -18201,20 +18473,8 @@ void Dblqh::readKey(Signal* signal)
{
Uint32 remainingLen = tcConnectptr.p->primKeyLen;
ndbrequire(remainingLen != 0);
- Uint32 dataLen = remainingLen;
- if (remainingLen > 4)
- dataLen = 4;
- readLogData(signal, dataLen, &tcConnectptr.p->tupkeyData[0]);
- remainingLen -= dataLen;
- while (remainingLen > 0) {
- jam();
- seizeTupkeybuf(signal);
- dataLen = remainingLen;
- if (dataLen > 4)
- dataLen = 4;
- readLogData(signal, dataLen, &databufptr.p->data[0]);
- remainingLen -= dataLen;
- }//while
+
+ readLogData(signal, remainingLen, tcConnectptr.p->keyInfoIVal);
}//Dblqh::readKey()
/* ------------------------------------------------------------------------- */
@@ -18222,15 +18482,25 @@ void Dblqh::readKey(Signal* signal)
/* */
/* SUBROUTINE SHORT NAME = RLD */
/* --------------------------------------------------------------------------*/
-void Dblqh::readLogData(Signal* signal, Uint32 noOfWords, Uint32* dataPtr)
+void Dblqh::readLogData(Signal* signal, Uint32 noOfWords, Uint32& sectionIVal)
{
- ndbrequire(noOfWords < 32);
Uint32 logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
if ((logPos + noOfWords) >= ZPAGE_SIZE) {
for (Uint32 i = 0; i < noOfWords; i++)
- dataPtr[i] = readLogwordExec(signal);
+ {
+ /* Todo : Consider reading > 1 word at a time */
+ Uint32 word= readLogwordExec(signal);
+ bool ok= appendToSection(sectionIVal,
+ &word,
+ 1);
+ ndbrequire(ok);
+ }
} else {
- MEMCOPY_NO_WORDS(dataPtr, &logPagePtr.p->logPageWord[logPos], noOfWords);
+ /* In one bite */
+ bool ok= appendToSection(sectionIVal,
+ &logPagePtr.p->logPageWord[logPos],
+ noOfWords);
+ ndbrequire(ok);
logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos + noOfWords;
}//if
}//Dblqh::readLogData()
@@ -18927,6 +19197,54 @@ void Dblqh::writeLogWord(Signal* signal,
}//Dblqh::writeLogWord()
/* --------------------------------------------------------------------------
+ * ------- WRITE MULTIPLE WORDS INTO THE LOG, CHECK FOR NEW PAGES -------
+ *
+ * ------------------------------------------------------------------------- */
+
+void Dblqh::writeLogWords(Signal* signal, const Uint32* data, Uint32 len)
+{
+ Uint32 logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
+ ndbrequire(logPos < ZPAGE_SIZE);
+ Uint32 wordsThisPage= ZPAGE_SIZE - logPos;
+
+ while (len >= wordsThisPage)
+ {
+ /* Fill rest of the log page */
+ MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
+ data,
+ wordsThisPage);
+ logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = ZPAGE_SIZE;
+ data+= wordsThisPage;
+ len-= wordsThisPage;
+
+ /* Mark page completed and get a new one */
+ jam();
+ completedLogPage(signal, ZNORMAL, __LINE__);
+ seizeLogpage(signal);
+ initLogpage(signal);
+ logFilePtr.p->currentLogpage = logPagePtr.i;
+ logFilePtr.p->currentFilepage++;
+
+ logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
+ ndbrequire(logPos < ZPAGE_SIZE);
+ wordsThisPage= ZPAGE_SIZE - logPos;
+ }
+
+ if (len > 0)
+ {
+ /* No need to worry about next page */
+ ndbassert( len < wordsThisPage );
+ /* Write partial log page */
+ MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
+ data,
+ len);
+ logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos + len;
+ }
+
+ ndbassert( logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] < ZPAGE_SIZE );
+}
+
+/* --------------------------------------------------------------------------
* ------- WRITE A NEXT LOG RECORD AND CHANGE TO NEXT MBYTE -------
*
* SUBROUTINE SHORT NAME: WNL
@@ -19686,11 +20004,11 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
<< endl;
ndbout << " transid0 = " << hex << tcRec.p->transid[0]
<< " transid1 = " << hex << tcRec.p->transid[1]
- << " tupkeyData0 = " << tcRec.p->tupkeyData[0]
- << " tupkeyData1 = " << tcRec.p->tupkeyData[1]
+ << " key[0] = " << getKeyInfoWordOrZero(tcRec.p, 0)
+ << " key[1] = " << getKeyInfoWordOrZero(tcRec.p, 1)
<< endl;
- ndbout << " tupkeyData2 = " << tcRec.p->tupkeyData[2]
- << " tupkeyData3 = " << tcRec.p->tupkeyData[3]
+ ndbout << " key[2] = " << getKeyInfoWordOrZero(tcRec.p, 2)
+ << " key[3] = " << getKeyInfoWordOrZero(tcRec.p, 3)
<< " m_nr_delete.m_cnt = " << tcRec.p->m_nr_delete.m_cnt
<< endl;
switch (tcRec.p->transactionState) {
@@ -19894,7 +20212,6 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
if (arg == 2352 && signal->getLength() == 2)
{
jam();
- Uint32 i;
Uint32 opNo = signal->theData[1];
TcConnectionrecPtr tcRec;
if (opNo < ttcConnectrecFileSize)
@@ -19903,24 +20220,16 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
tcRec.i = opNo;
ptrCheckGuard(tcRec, ttcConnectrecFileSize, regTcConnectionrec);
- Uint32 keyLen = tcRec.p->primKeyLen;
BaseString key;
- for(i = 0; i<keyLen && i < 4; i++)
+ if (tcRec.p->keyInfoIVal != RNIL)
{
- jam();
- key.appfmt("0x%x ", tcRec.p->tupkeyData[i]);
- }
-
- if (keyLen > 4)
- {
- jam();
- tcConnectptr = tcRec;
- sendKeyinfoAcc(signal, 4);
- for (i = 4; i<keyLen; i++)
- {
- jam();
- key.appfmt("0x%x ", signal->theData[i]);
- }
+ jam();
+ SectionReader keyInfoReader(tcRec.p->keyInfoIVal,
+ g_sectionSegmentPool);
+
+ Uint32 keyWord;
+ while (keyInfoReader.getWord(&keyWord))
+ key.appfmt("0x%x ", keyWord);
}
char buf[100];
@@ -20097,18 +20406,14 @@ Dblqh::TRACE_OP_DUMP(const Dblqh::TcConn
{
(* traceopout) << "key=[" << hex;
- Uint32 i;
- for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
- (* traceopout) << hex << regTcPtr->tupkeyData[i] << " ";
- }
-
- DatabufPtr regDatabufptr;
- regDatabufptr.i = regTcPtr->firstTupkeybuf;
- while(i < regTcPtr->primKeyLen)
+ if (regTcPtr->keyInfoIVal != RNIL)
{
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
- (* traceopout) << hex << regDatabufptr.p->data[j] << " ";
+ SectionReader keyInfoReader(regTcPtr->keyInfoIVal,
+ g_sectionSegmentPool);
+
+ Uint32 keyWord;
+ while (keyInfoReader.getWord(&keyWord))
+ (* traceopout) << hex << keyWord << " ";
}
(* traceopout) << "] ";
}
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-07-01 12:35:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-08-07 11:52:50 +0000
@@ -1975,7 +1975,6 @@ void Dbtc::tckeyreq020Lab(Signal* signal
&signal->theData[KeyInfo::HeaderLength],
wordsInSignal))
{
- // TODO : Consider error insert
jam();
appendToSectionErrorLab(signal);
return;
@@ -2991,8 +2990,6 @@ void Dbtc::execTCKEYREQ(Signal* signal)
bool ok= appendToSection(regCachePtr->keyInfoSectionI,
&TOptionalDataPtr[TkeyIndex],
keyInfoInTCKeyReq);
-
- // TODO : Consider error insert
if (!ok)
{
jam();
@@ -3007,8 +3004,6 @@ void Dbtc::execTCKEYREQ(Signal* signal)
ok= appendToSection(regCachePtr->attrInfoSectionI,
&TOptionalDataPtr[TAIDataIndex],
titcLenAiInTckeyreq);
-
- // TODO : Consider error insert
if (!ok)
{
jam();
@@ -3532,13 +3527,27 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
reorg = 2;
}
- /* Todo : Determine whether to use a long Lqh key req based on the
- * version of the target Lqh node
+ Uint32 inlineKeyLen= 0;
+ Uint32 inlineAttrLen= 0;
+
+ /* We normally send long LQHKEYREQ unless the
+ * destination cannot handle it or we are
+ * testing
*/
- regCachePtr->useLongLqhKeyReq= 0;
+ if (unlikely((version < NDBD_LONG_LQHKEYREQ) ||
+ ERROR_INSERTED(8069)))
+ {
+ /* Short LQHKEYREQ, with some key/attr data inline */
+ regCachePtr->useLongLqhKeyReq= 0;
+ inlineKeyLen= regCachePtr->keylen;
+ inlineAttrLen= regCachePtr->attrlength;
+ }
+ else
+ /* Long LQHKEYREQ, with key/attr data in long sections */
+ regCachePtr->useLongLqhKeyReq= 1;
tslrAttrLen = 0;
- LqhKeyReq::setAttrLen(tslrAttrLen, regCachePtr->attrlength);
+ LqhKeyReq::setAttrLen(tslrAttrLen, inlineAttrLen);
/* ---------------------------------------------------------------------- */
// Bit16 == 0 since StoredProcedures are not yet supported.
/* ---------------------------------------------------------------------- */
@@ -3551,7 +3560,7 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
sig1 = regTcPtr->operation;
sig2 = regTcPtr->dirtyOp;
bool dirtyRead = (sig1 == ZREAD && sig2 == ZTRUE);
- LqhKeyReq::setKeyLen(Tdata10, regCachePtr->keylen);
+ LqhKeyReq::setKeyLen(Tdata10, inlineKeyLen);
LqhKeyReq::setLastReplicaNo(Tdata10, regTcPtr->lastReplicaNo);
if (unlikely(version < NDBD_ROWID_VERSION))
{
@@ -3647,9 +3656,42 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
nextPos++;
}//if
+ // Reset trigger count
+ regTcPtr->accumulatingTriggerData.i = RNIL;
+ regTcPtr->accumulatingTriggerData.p = NULL;
+ regTcPtr->noFiredTriggers = 0;
+ regTcPtr->triggerExecutionCount = 0;
+
if (regCachePtr->useLongLqhKeyReq)
{
- ndbassert(false); // TODO
+ /* Build long LQHKeyReq using Key + AttrInfo sections */
+ SectionHandle handle(this);
+ SegmentedSectionPtr keyInfoSection;
+
+ getSection(keyInfoSection, regCachePtr->keyInfoSectionI);
+
+ handle.m_ptr[ LqhKeyReq::KeyInfoSectionNum ]= keyInfoSection;
+ handle.m_cnt= 1;
+
+ if (regCachePtr->attrlength != 0)
+ {
+ SegmentedSectionPtr attrInfoSection;
+
+ ndbassert(regCachePtr->attrInfoSectionI != RNIL);
+ getSection(attrInfoSection, regCachePtr->attrInfoSectionI);
+
+ handle.m_ptr[ LqhKeyReq::AttrInfoSectionNum ]= attrInfoSection;
+ handle.m_cnt= 2;
+ }
+
+ sendSignal(TBRef, GSN_LQHKEYREQ, signal,
+ nextPos + LqhKeyReq::FixedSignalLength, JBB,
+ &handle);
+
+ /* Long sections were freed as part of sendSignal */
+ ndbassert( handle.m_cnt == 0 );
+ regCachePtr->keyInfoSectionI= RNIL;
+ regCachePtr->attrInfoSectionI= RNIL;
}
else
{
@@ -3685,17 +3727,10 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
nextPos+= aiInLqhKeyReq;
}
- }
- // Reset trigger count
- regTcPtr->accumulatingTriggerData.i = RNIL;
- regTcPtr->accumulatingTriggerData.p = NULL;
- regTcPtr->noFiredTriggers = 0;
- regTcPtr->triggerExecutionCount = 0;
-
- // TODO : Long LqhKeyReq variant
- sendSignal(TBRef, GSN_LQHKEYREQ, signal,
- nextPos + LqhKeyReq::FixedSignalLength, JBB);
+ sendSignal(TBRef, GSN_LQHKEYREQ, signal,
+ nextPos + LqhKeyReq::FixedSignalLength, JBB);
+ }
}//Dbtc::sendlqhkeyreq()
void Dbtc::packLqhkeyreq040Lab(Signal* signal,
@@ -3996,7 +4031,7 @@ void Dbtc::execSIGNAL_DROPPED_REP(Signal
DEBUG("SignalDroppedRep received for GSN " << originalGSN);
// TODO : Add handling for long signal variants as they
- // are added here (TCINDXREQ, SCANTABREQ etc.)
+ // are added here (SCANTABREQ etc.)
switch(originalGSN) {
case GSN_TCKEYREQ:
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2008-08-07 11:52:50 +0000
@@ -784,15 +784,16 @@ struct Operationrec {
Uint32 m_undo_buffer_space; // In words
union {
- Uint32 firstAttrinbufrec; //Used until copyAttrinfo
+ Uint32 firstAttrinbufrec; //Used until copyAttrinfo TODO Remove
};
+
Uint32 m_any_value;
union {
- Uint32 lastAttrinbufrec; //Used until copyAttrinfo
+ Uint32 lastAttrinbufrec; //Used until copyAttrinfo TODO Remove
Uint32 nextPool;
};
- Uint32 attrinbufLen; //only used during STORED_PROCDEF phase
- Uint32 storedProcPtr; //only used during STORED_PROCDEF phase
+ Uint32 attrinbufLen; //only used during STORED_PROCDEF phase TODO Remove
+ Uint32 storedProcPtr; //only used during STORED_PROCDEF phase TODO Remove
/*
* From fragment i-value we can find fragment and table record
@@ -2055,9 +2056,9 @@ private:
//------------------------------------------------------------------
//------------------------------------------------------------------
- void sendLogAttrinfo(Signal* signal,
- Uint32 TlogSize,
- Operationrec * regOperPtr);
+ int sendLogAttrinfo(Signal* signal,
+ Uint32 TlogSize,
+ Operationrec * regOperPtr);
//------------------------------------------------------------------
//------------------------------------------------------------------
@@ -2485,7 +2486,8 @@ private:
//------------------------------------------------------------------
//------------------------------------------------------------------
- void copyAttrinfo(Operationrec * regOperPtr, Uint32* inBuffer);
+ void copyAttrinfo(Operationrec * regOperPtr, Uint32* inBuffer,
+ Uint32 expectedLen, Uint32 attrInfoIVal);
//------------------------------------------------------------------
//------------------------------------------------------------------
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2008-06-05 20:34:20 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2008-08-07 12:32:06 +0000
@@ -80,8 +80,37 @@ int Dbtup::initStoredOperationrec(Operat
}
void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
- Uint32* inBuffer)
+ Uint32* inBuffer,
+ Uint32 expectedLen,
+ Uint32 attrInfoIVal)
{
+ if (attrInfoIVal != RNIL)
+ {
+ /* AttrInfo is in segmented section */
+ ndbassert(regOperPtr->firstAttrinbufrec == RNIL);
+ ndbassert(regOperPtr->lastAttrinbufrec == RNIL);
+
+ // TODO : Add support for Stored procedure attrinfo
+
+ /* Check length */
+ SegmentedSectionPtr sectionPtr;
+ getSection(sectionPtr, attrInfoIVal);
+
+ ndbrequire(sectionPtr.sz == expectedLen);
+ ndbrequire(sectionPtr.sz < ZATTR_BUFFER_SIZE);
+
+ /* Copy attrInfo data into linear buffer */
+ copy(inBuffer, attrInfoIVal);
+
+ regOperPtr->storedProcedureId= RNIL;
+ regOperPtr->m_any_value= 0;
+
+ return;
+ }
+
+ /* Todo : Unify with code above when scan processing
+ * goes long
+ */
AttrbufrecPtr copyAttrBufPtr;
Uint32 RnoOfAttrBufrec= cnoOfAttrbufrec;
int RbufLen;
@@ -696,6 +725,17 @@ void Dbtup::execTUPKEYREQ(Signal* signal
req_struct.m_row_id.m_page_no = sig1;
req_struct.m_row_id.m_page_idx = sig2;
+
+ /* Get AttrInfo section if this is a long TUPKEYREQ */
+ Uint32 attrInfoIVal= tupKeyReq->attrInfoIVal;
+
+ /* If we have AttrInfo, check we expected it, and
+ * that we don't have AttrInfo by another means
+ */
+ ndbassert( (attrInfoIVal == RNIL) ||
+ (tupKeyReq->attrBufLen > 0));
+ ndbassert( (attrInfoIVal == RNIL) ||
+ (regOperPtr->firstAttrinbufrec == RNIL ));
Uint32 Roptype = regOperPtr->op_struct.op_type;
@@ -705,7 +745,10 @@ void Dbtup::execTUPKEYREQ(Signal* signal
Rstoredid) == ZOK);
}
- copyAttrinfo(regOperPtr, &cinBuffer[0]);
+ copyAttrinfo(regOperPtr,
+ &cinBuffer[0],
+ req_struct.attrinfo_len,
+ attrInfoIVal);
Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
if (Roptype == ZINSERT && localkey == ~ (Uint32) 0)
@@ -1921,7 +1964,11 @@ int Dbtup::interpreterStartLab(Signal* s
Uint32 RattroutCounter= 0;
Uint32 RinstructionCounter= 5;
- Uint32 RlogSize= 0;
+
+ /* All information to be logged/propagated to replicas
+ * is generated from here on so reset the log word count
+ */
+ Uint32 RlogSize= req_struct->log_size= 0;
if (((RtotalLen + 5) == RattrinbufLen) &&
(RattrinbufLen >= 5) &&
(RattrinbufLen < ZATTR_BUFFER_SIZE)) {
@@ -2027,11 +2074,16 @@ int Dbtup::interpreterStartLab(Signal* s
return -1;
}
}
- req_struct->log_size= RlogSize;
+ /* Add log words explicitly generated here to existing log size
+ * - readAttributes can generate log for ANYVALUE column
+ * It adds the words directly to req_struct->log_size
+ * This is used for ANYVALUE and interpreted delete.
+ */
+ req_struct->log_size+= RlogSize;
req_struct->read_length= RattroutCounter;
sendReadAttrinfo(signal, req_struct, RattroutCounter, regOperPtr);
if (RlogSize > 0) {
- sendLogAttrinfo(signal, RlogSize, regOperPtr);
+ return sendLogAttrinfo(signal, RlogSize, regOperPtr);
}
return 0;
} else {
@@ -2047,25 +2099,41 @@ int Dbtup::interpreterStartLab(Signal* s
/* TLOG_START FIRST INDEX TO LOG */
/* TLOG_END LAST INDEX + 1 TO LOG */
/* ---------------------------------------------------------------- */
-void Dbtup::sendLogAttrinfo(Signal* signal,
- Uint32 TlogSize,
- Operationrec * const regOperPtr)
+int Dbtup::sendLogAttrinfo(Signal* signal,
+ Uint32 TlogSize,
+ Operationrec * const regOperPtr)
{
- Uint32 TbufferIndex= 0;
+ /* Copy from Log buffer to segmented section,
+ * then attach to ATTRINFO and execute direct
+ * to LQH
+ */
+ ndbrequire( TlogSize > 0 );
+ Uint32 longSectionIVal= RNIL;
+ bool ok= appendToSection(longSectionIVal,
+ &clogMemBuffer[0],
+ TlogSize);
+ if (unlikely(!ok))
+ {
+ /* Resource error, abort transaction */
+ terrorCode = ZSEIZE_ATTRINBUFREC_ERROR;
+ tupkeyErrorLab(signal);
+ return -1;
+ }
+
+ /* Send a TUP_ATTRINFO signal to LQH, which contains
+ * the relevant user pointer and the attrinfo section's
+ * IVAL
+ */
signal->theData[0]= regOperPtr->userpointer;
- while (TlogSize > 22) {
- MEMCOPY_NO_WORDS(&signal->theData[3],
- &clogMemBuffer[TbufferIndex],
- 22);
- EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 25);
- TbufferIndex += 22;
- TlogSize -= 22;
- }
- MEMCOPY_NO_WORDS(&signal->theData[3],
- &clogMemBuffer[TbufferIndex],
- TlogSize);
- EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 3 + TlogSize);
+ signal->theData[1]= TlogSize;
+ signal->theData[2]= longSectionIVal;
+
+ EXECUTE_DIRECT(DBLQH,
+ GSN_TUP_ATTRINFO,
+ signal,
+ 3);
+ return 0;
}
inline
=== modified file 'storage/ndb/src/kernel/vm/LongSignal.hpp'
--- a/storage/ndb/src/kernel/vm/LongSignal.hpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/kernel/vm/LongSignal.hpp 2008-08-07 11:52:50 +0000
@@ -52,6 +52,7 @@ extern SectionSegmentPool g_sectionSegme
void print(SegmentedSectionPtr ptr, FILE* out);
void copy(SegmentedSectionPtr dst, Uint32 * src, Uint32 len);
void copy(Uint32 * dst, SegmentedSectionPtr src);
+void copy(Uint32 * dst, Uint32 srcFirstIVal);
bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
/* appendToSection : If firstSegmentIVal == RNIL, import */
bool appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len);
=== modified file 'storage/ndb/src/kernel/vm/SectionReader.cpp'
--- a/storage/ndb/src/kernel/vm/SectionReader.cpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/kernel/vm/SectionReader.cpp 2008-08-07 11:52:50 +0000
@@ -41,6 +41,17 @@ SectionReader::SectionReader
}
}
+SectionReader::SectionReader
+(Uint32 firstSectionIVal, class SectionSegmentPool& pool)
+ : m_pool(pool)
+{
+ SectionSegment* firstSeg= m_pool.getPtr(firstSectionIVal);
+
+ m_pos = 0;
+ m_len = firstSeg->m_sz;
+ m_head = m_currentSegment = firstSeg;
+}
+
void
SectionReader::reset(){
m_pos = 0;
@@ -137,7 +148,63 @@ SectionReader::getWords(Uint32 * dst, Ui
memcpy(dst, &p->theData[ind], 4 * len);
m_pos += len;
- m_currentSegment = p;
+
+ /* If we read everything in the current
+ * segment, and there's another segment,
+ * move onto it for next time
+ */
+ if (unlikely((len == left) &&
+ (p->m_nextSegment != RNIL)))
+ p= m_pool.getPtr(p->m_nextSegment);
+ else
+ m_currentSegment = p;
+
return true;
}
+bool
+SectionReader::getWordsPtr(Uint32 maxLen,
+ const Uint32*& readPtr,
+ Uint32& actualLen)
+{
+ if(m_pos >= m_len)
+ return false;
+
+ /* We return a pointer to the current position,
+ * with length the minimum of
+ * - significant words remaining in the whole section
+ * - space remaining in the current segment
+ * - maxLen from caller
+ */
+ const Uint32 sectionRemain= m_len - m_pos;
+ const Uint32 startInd = (m_pos % SectionSegment::DataLength);
+ const Uint32 segmentSpace = SectionSegment::DataLength - startInd;
+ SectionSegment * p = m_currentSegment;
+
+ const Uint32 remain= MIN(sectionRemain, segmentSpace);
+ actualLen= MIN(remain, maxLen);
+ readPtr= &p->theData[startInd];
+
+ /* If we've read everything in this segment, and
+ * there's another one, move onto it ready for
+ * next time
+ */
+ if (((startInd + actualLen) == SectionSegment::DataLength) &&
+ (p->m_nextSegment != RNIL))
+ m_currentSegment= m_pool.getPtr(p->m_nextSegment);
+
+ m_pos += actualLen;
+ return true;
+};
+
+bool
+SectionReader::getWordsPtr(const Uint32*& readPtr,
+ Uint32& actualLen)
+{
+ /* Cannot have more than SectionSegment::DataLength
+ * contiguous words
+ */
+ return getWordsPtr(SectionSegment::DataLength,
+ readPtr,
+ actualLen);
+}
=== modified file 'storage/ndb/src/kernel/vm/SectionReader.hpp'
--- a/storage/ndb/src/kernel/vm/SectionReader.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/kernel/vm/SectionReader.hpp 2008-08-07 11:52:50 +0000
@@ -22,15 +22,37 @@ class SectionReader {
public:
SectionReader(struct SegmentedSectionPtr &,
class SectionSegmentPool &);
+ SectionReader(Uint32 firstSectionIVal,
+ class SectionSegmentPool &);
+ /* reset : Set SectionReader to start of section */
void reset();
+ /* step : Step over given number of words */
bool step(Uint32 len);
+ /* getWord : Copy one word to dst + move forward */
bool getWord(Uint32 * dst);
+ /* peekWord : Copy one word to dst */
bool peekWord(Uint32 * dst) const ;
+ /* peekWords : Copy len words to dst */
bool peekWords(Uint32 * dst, Uint32 len) const;
+ /* getSize : Get total size of section */
Uint32 getSize() const;
+ /* getWords : Copy len words to dst + move forward */
bool getWords(Uint32 * dst, Uint32 len);
+ /* getWordsPtr : Get const ptr to next contiguous
+ * block of words
+ * In success case will return at least 1 word
+ */
+ bool getWordsPtr(const Uint32*& readPtr,
+ Uint32& actualLen);
+ /* getWordsPtr : Get const ptr to at most maxLen words
+ * In success case will return at least 1 word
+ */
+ bool getWordsPtr(Uint32 maxLen,
+ const Uint32*& readPtr,
+ Uint32& actualLen);
+
private:
Uint32 m_pos;
Uint32 m_len;
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-08-07 11:52:50 +0000
@@ -142,6 +142,16 @@ copy(Uint32 * dst, SegmentedSectionPtr s
copy(dst, g_sectionSegmentPool, src);
}
+/* Copy variant which takes an IVal */
+void
+copy(Uint32* dst, Uint32 srcFirstIVal)
+{
+ SegmentedSectionPtr p;
+ getSection(p, srcFirstIVal);
+
+ copy(dst, p);
+}
+
/* Calculate number of segments to release based on section size
* Always release one segment, even if size is zero
*/
=== modified file 'storage/ndb/src/ndbapi/NdbOperationExec.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2008-08-05 10:52:53 +0000
@@ -942,6 +942,28 @@ NdbOperation::buildSignalsNdbRecord(Uint
}
}
+ if (m_use_any_value &&
+ (tOpType == DeleteRequest))
+ {
+ /* Special hack for delete and ANYVALUE pseudo-column
+ * We want to be able set the ANYVALUE pseudo-column as
+ * part of a delete, but deletes don't allow updates
+ * So we perform a 'read' of the column, passing a value.
+ * Code in TUP which handles this 'read' will set the
+ * value when the read is processed.
+ */
+ res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
+ AttributeHeader::ANY_VALUE, 4,
+ &attrInfoPtr, &remain);
+ if(res)
+ return res;
+ res= insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
+ (const char *)(&m_any_value), 4,
+ &attrInfoPtr, &remain);
+ if(res)
+ return res;
+ }
+
/* Interpreted program main signal words */
if (code)
{
@@ -1126,10 +1148,9 @@ NdbOperation::buildSignalsNdbRecord(Uint
if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest) ||
- (tOpType == UpdateRequest) ||
- (tOpType == DeleteRequest))
+ (tOpType == UpdateRequest))
{
- /* Handle any setAnyValue(). */
+ /* Handle setAnyValue() for all cases except delete */
if (m_use_any_value)
{
res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
=== modified file 'storage/ndb/src/ndbapi/NdbScanFilter.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanFilter.cpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanFilter.cpp 2008-08-06 15:46:41 +0000
@@ -75,7 +75,11 @@ public:
m_code= code;
m_associated_op= NULL;
- m_error.code = 0;
+ if (code == NULL)
+ /* NdbInterpretedCode not supported for operation type */
+ m_error.code = 4539;
+ else
+ m_error.code = 0;
};
/* This method propagates an error code from NdbInterpretedCode
@@ -136,13 +140,22 @@ NdbScanFilter::NdbScanFilter(class NdbOp
{
DBUG_ENTER("NdbScanFilter::NdbScanFilter(NdbOperation)");
- /* We ask the NdbScanOperation to allocate an InterpretedCode
- * object for us. It will look after freeing it when
- * necessary. This allows the InterpretedCode object to
- * survive after the NdbScanFilter has gone out of scope
+ NdbInterpretedCode* code= NULL;
+ NdbOperation::Type opType= op->getType();
+
+ /* If the operation is not of the correct type then
+ * m_impl.init() will set an error on the scan filter
*/
- NdbInterpretedCode* code=
- ((NdbScanOperation *)op)->allocInterpretedCodeOldApi();
+ if (likely((opType == NdbOperation::TableScan) ||
+ (opType == NdbOperation::OrderedIndexScan)))
+ {
+ /* We ask the NdbScanOperation to allocate an InterpretedCode
+ * object for us. It will look after freeing it when
+ * necessary. This allows the InterpretedCode object to
+ * survive after the NdbScanFilter has gone out of scope
+ */
+ code= ((NdbScanOperation *)op)->allocInterpretedCodeOldApi();
+ }
m_impl.init(code);
@@ -158,6 +171,8 @@ NdbScanFilter::~NdbScanFilter()
int
NdbScanFilter::begin(Group group){
+ if (m_impl.m_error.code != 0) return -1;
+
if (m_impl.m_stack2.push_back(m_impl.m_negative))
{
/* Memory allocation problem */
@@ -244,6 +259,8 @@ NdbScanFilter::begin(Group group){
int
NdbScanFilter::end(){
+ if (m_impl.m_error.code != 0) return -1;
+
if(m_impl.m_stack2.size() == 0){
/* Invalid set of range scan bounds */
m_impl.m_error.code= 4259;
@@ -353,6 +370,8 @@ NdbScanFilter::end(){
int
NdbScanFilter::istrue(){
+ if(m_impl.m_error.code != 0) return -1;
+
if(m_impl.m_current.m_group < NdbScanFilter::AND ||
m_impl.m_current.m_group > NdbScanFilter::NOR){
/* Operator is not defined in NdbScanFilter::Group */
@@ -373,6 +392,7 @@ NdbScanFilter::istrue(){
int
NdbScanFilter::isfalse(){
+ if (m_impl.m_error.code != 0) return -1;
if(m_impl.m_current.m_group < NdbScanFilter::AND ||
m_impl.m_current.m_group > NdbScanFilter::NOR){
/* Operator is not defined in NdbScanFilter::Group */
@@ -436,6 +456,8 @@ const int tab2_sz = sizeof(table2)/sizeo
int
NdbScanFilterImpl::cond_col(Interpreter::UnaryCondition op, Uint32 AttrId){
+ if (m_error.code != 0) return -1;
+
if(op < 0 || op >= tab2_sz){
/* Condition is out of bounds */
m_error.code= 4262;
@@ -458,6 +480,8 @@ NdbScanFilterImpl::cond_col(Interpreter:
int
NdbScanFilter::isnull(int AttrId){
+ if (m_impl.m_error.code != 0) return -1;
+
if(m_impl.m_negative == 1)
return m_impl.cond_col(Interpreter::IS_NOT_NULL, AttrId);
else
@@ -466,6 +490,8 @@ NdbScanFilter::isnull(int AttrId){
int
NdbScanFilter::isnotnull(int AttrId){
+ if (m_impl.m_error.code != 0) return -1;
+
if(m_impl.m_negative == 1)
return m_impl.cond_col(Interpreter::IS_NULL, AttrId);
else
@@ -568,6 +594,8 @@ int
NdbScanFilterImpl::cond_col_const(Interpreter::BinaryCondition op,
Uint32 AttrId,
const void * value, Uint32 len){
+ if (m_error.code != 0) return -1;
+
if(op < 0 || op >= tab3_sz){
/* Condition is out of bounds */
m_error.code= 4262;
=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2008-06-17 20:28:45 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2008-08-06 15:46:41 +0000
@@ -53,7 +53,7 @@ NdbScanOperation::NdbScanOperation(Ndb*
theSCAN_TABREQ = 0;
m_executed = false;
m_scan_buffer= NULL;
- m_scanUsingOldApi= false;
+ m_scanUsingOldApi= true;
m_interpretedCodeOldApi= NULL;
}
@@ -119,7 +119,7 @@ NdbScanOperation::init(const NdbTableImp
m_descending= false;
m_read_range_no = 0;
m_executed = false;
- m_scanUsingOldApi= false;
+ m_scanUsingOldApi= true;
m_interpretedCodeOldApi= NULL;
m_api_receivers_count = 0;
@@ -937,16 +937,14 @@ NdbScanOperation::readTuples(NdbScanOper
Uint32 parallel,
Uint32 batch)
{
- // It is only possible to call readTuples if
- // 1. the scan transaction doesn't already contain another scan operation
- // 2. We have not already defined an old Api scan operation.
- if (theNdbCon->theScanningOp != NULL ||
- m_scanUsingOldApi ){
+ // It is only possible to call readTuples if the scan transaction
+ // doesn't already contain a scan operation
+ if (theNdbCon->theScanningOp != NULL)
+ {
setErrorCode(4605);
return -1;
}
-
- m_scanUsingOldApi= true;
+
/* Save parameters for later */
m_savedLockModeOldApi= lm;
m_savedScanFlagsOldApi= scan_flags;
=== modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp 2008-06-03 16:18:01 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp 2008-08-06 15:39:54 +0000
@@ -2583,6 +2583,8 @@ NdbTransaction::scanTable(const NdbRecor
DBUG_RETURN(NULL);
}
+ op_idx->m_scanUsingOldApi= false;
+
/* The real work is done in NdbScanOperation */
if (op_idx->scanTableImpl(result_record,
lock_mode,
@@ -2622,6 +2624,8 @@ NdbTransaction::scanIndex(const NdbRecor
return NULL;
}
+ op->m_scanUsingOldApi= false;
+
/* Defer the rest of the work to NdbIndexScanOperation */
if (op->scanIndexImpl(key_record,
result_record,
=== modified file 'storage/ndb/test/ndbapi/testDict.cpp'
--- a/storage/ndb/test/ndbapi/testDict.cpp 2008-08-04 13:40:17 +0000
+++ b/storage/ndb/test/ndbapi/testDict.cpp 2008-08-07 12:32:06 +0000
@@ -57,6 +57,13 @@ int runCreateInvalidTables(NDBT_Context*
int result = NDBT_OK;
char failTabName[256];
+
+ const int expectedDictErrors[6]= {720,
+ 4317,
+ 737,
+ 739,
+ 736,
+ 740 };
for (int i = 0; i < 10; i++){
BaseString::snprintf(failTabName, 256, "F%d", i);
@@ -71,6 +78,19 @@ int runCreateInvalidTables(NDBT_Context*
result = NDBT_FAILED;
}
+ // Ensure any error is roughly as expected
+ int errorCode=pNdb->getDictionary()->getNdbError().code;
+ bool errorOk= false;
+ for (int e=0; e < 6; e++)
+ errorOk |= (errorCode == expectedDictErrors[e]);
+
+ if (!errorOk)
+ {
+ ndbout << "Failure, got dict error : " << pNdb->getDictionary()->
+ getNdbError().code << endl;
+ return NDBT_FAILED;
+ }
+
// Verify that table is not in db
const NdbDictionary::Table* pTab2 =
NDBT_Table::discoverTableFromDb(pNdb, failTabName) ;
=== modified file 'storage/ndb/test/ndbapi/testScanFilter.cpp'
--- a/storage/ndb/test/ndbapi/testScanFilter.cpp 2008-08-04 13:40:17 +0000
+++ b/storage/ndb/test/ndbapi/testScanFilter.cpp 2008-08-06 15:46:41 +0000
@@ -894,6 +894,132 @@ int runMaxScanFilterSize(NDBT_Context* c
return NDBT_OK;
}
+
+int runScanFilterConstructorFail(NDBT_Context* ctx, NDBT_Step* step)
+{
+ /* We test that failures in the ScanFilter constructor can be
+ * detected by the various ScanFilter methods without
+ * issues
+ */
+ Ndb *myNdb = GETNDB(step);
+ const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
+ const NdbDictionary::Table *myTable= myDict->getTable(TABLE_NAME);
+ if(myTable == NULL)
+ APIERROR(myDict->getNdbError());
+
+ NdbTransaction* trans=myNdb->startTransaction();
+
+ if (trans == NULL)
+ {
+ APIERROR(trans->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ /* Create an NdbRecord scan operation */
+ const NdbScanOperation* tabScan=
+ trans->scanTable(myTable->getDefaultRecord());
+
+ if (tabScan==NULL)
+ {
+ APIERROR(trans->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ /* Now we hackily try to add a ScanFilter after the operation
+ * is defined. This will cause a failure within the
+ * constructor
+ */
+ NdbScanFilter brokenSf((NdbScanOperation*) tabScan);
+
+ /* Scan operation should have an error */
+ if (tabScan->getNdbError().code != 4536)
+ {
+ ndbout << "Expected error 4536, had error " <<
+ tabScan->getNdbError().code << " instead" << endl;
+ return NDBT_FAILED;
+ }
+
+ /* ScanFilter should have an error */
+ if (brokenSf.getNdbError().code != 4539)
+ {
+ ndbout << "Expected error 4539, had error " <<
+ brokenSf.getNdbError().code << " instead" << endl;
+ return NDBT_FAILED;
+ }
+
+ if (brokenSf.begin() != -1)
+ { ndbout << "Bad rc from begin" << endl; return NDBT_FAILED; }
+
+ if (brokenSf.istrue() != -1)
+ { ndbout << "Bad rc from istrue" << endl; return NDBT_FAILED; }
+
+ if (brokenSf.isfalse() != -1)
+ { ndbout << "Bad rc from isfalse" << endl; return NDBT_FAILED; }
+
+ if (brokenSf.isnull(0) != -1)
+ { ndbout << "Bad rc from isnull" << endl; return NDBT_FAILED; }
+
+ if (brokenSf.isnotnull(0) != -1)
+ { ndbout << "Bad rc from isnotnull" << endl; return NDBT_FAILED; }
+
+ if (brokenSf.cmp(NdbScanFilter::COND_EQ, 0, NULL, 0) != -1)
+ { ndbout << "Bad rc from cmp" << endl; return NDBT_FAILED; }
+
+ if (brokenSf.end() != -1)
+ { ndbout << "Bad rc from begin" << endl; return NDBT_FAILED; }
+
+ trans->close();
+
+ /* Now we check that we can define a ScanFilter before
+ * calling readTuples() for a scan operation
+ */
+ trans= myNdb->startTransaction();
+
+ if (trans == NULL)
+ {
+ APIERROR(trans->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ /* Get an old Api table scan operation */
+ NdbScanOperation* tabScanOp=
+ trans->getNdbScanOperation(myTable);
+
+ if (tabScanOp==NULL)
+ {
+ APIERROR(trans->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ /* Attempt to define a ScanFilter before calling readTuples() */
+ NdbScanFilter sf(tabScanOp);
+
+ /* Should be no problem ... */
+ if (sf.getNdbError().code != 0)
+ { APIERROR(sf.getNdbError()); return NDBT_FAILED; };
+
+
+ /* Ok, now attempt to define a ScanFilter against a primary key op */
+ NdbOperation* pkOp= trans->getNdbOperation(myTable);
+
+ if (pkOp == NULL)
+ {
+ APIERROR(trans->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ NdbScanFilter sf2(pkOp);
+
+ if (sf2.getNdbError().code != 4539)
+ {
+ ndbout << "Error, expected 4539" << endl;
+ APIERROR(sf2.getNdbError());
+ return NDBT_FAILED;
+ }
+
+ return NDBT_OK;
+}
+
NDBT_TESTSUITE(testScanFilter);
TESTCASE(TEST_NAME,
"Scan table TABLE_NAME for the records which accord with \
@@ -903,6 +1029,7 @@ TESTCASE(TEST_NAME,
INITIALIZER(runPopulate);
INITIALIZER(runScanRandomFilterTest);
INITIALIZER(runMaxScanFilterSize);
+ INITIALIZER(runScanFilterConstructorFail);
FINALIZER(runDropTables);
}
=== modified file 'storage/ndb/test/ndbapi/test_event.cpp'
--- a/storage/ndb/test/ndbapi/test_event.cpp 2008-08-04 13:40:17 +0000
+++ b/storage/ndb/test/ndbapi/test_event.cpp 2008-08-05 10:52:53 +0000
@@ -23,14 +23,13 @@
#include <NdbRestarts.hpp>
#include <signaldata/DumpStateOrd.hpp>
-static int createEvent(Ndb *pNdb,
+static int createEvent(Ndb *pNdb,
const NdbDictionary::Table &tab,
- NDBT_Context* ctx)
+ bool merge_events,
+ bool report)
{
char eventName[1024];
sprintf(eventName,"%s_EVENT",tab.getName());
- bool merge_events = ctx->getProperty("MergeEvents");
- bool report = ctx->getProperty("ReportSubscribe");
NdbDictionary::Dictionary *myDict = pNdb->getDictionary();
@@ -87,6 +86,16 @@ static int createEvent(Ndb *pNdb,
return NDBT_OK;
}
+static int createEvent(Ndb *pNdb,
+ const NdbDictionary::Table &tab,
+ NDBT_Context* ctx)
+{
+ bool merge_events = ctx->getProperty("MergeEvents");
+ bool report = ctx->getProperty("ReportSubscribe");
+
+ return createEvent(pNdb, tab, merge_events, report);
+}
+
static int dropEvent(Ndb *pNdb, const NdbDictionary::Table &tab)
{
char eventName[1024];
@@ -2688,6 +2697,346 @@ runBug37442(NDBT_Context* ctx, NDBT_Step
return NDBT_OK;
}
+const NdbDictionary::Table* createBoringTable(const char* name, Ndb* pNdb)
+{
+ NdbDictionary::Table tab;
+
+ tab.setName(name);
+
+ NdbDictionary::Column pk;
+ pk.setName("Key");
+ pk.setType(NdbDictionary::Column::Unsigned);
+ pk.setLength(1);
+ pk.setNullable(false);
+ pk.setPrimaryKey(true);
+ tab.addColumn(pk);
+
+ NdbDictionary::Column attr;
+ attr.setName("Attr");
+ attr.setType(NdbDictionary::Column::Unsigned);
+ attr.setLength(1);
+ attr.setNullable(true);
+ attr.setPrimaryKey(false);
+ tab.addColumn(attr);
+
+ pNdb->getDictionary()->dropTable(tab.getName());
+ if(pNdb->getDictionary()->createTable(tab) == 0)
+ {
+ ndbout << (NDBT_Table&)tab << endl;
+ return pNdb->getDictionary()->getTable(tab.getName());
+ }
+
+ ndbout << "Table create failed, err : " <<
+ pNdb->getDictionary()->getNdbError().code << endl;
+
+ return NULL;
+}
+
+/* Types of operation which can be tagged via 'setAnyValue */
+enum OpTypes {Insert, Update, Write, Delete, EndOfOpTypes};
+
+/**
+ * executeOps
+ * Generate a number of PK operations of the supplied type
+ * using the passed operation options and setting the
+ * anyValue tag
+ */
+int
+executeOps(Ndb* pNdb,
+ const NdbDictionary::Table* tab,
+ OpTypes op,
+ Uint32 rowCount,
+ Uint32 keyOffset,
+ Uint32 anyValueOffset,
+ NdbOperation::OperationOptions opts)
+{
+ NdbTransaction* trans= pNdb->startTransaction();
+ const NdbRecord* record= tab->getDefaultRecord();
+
+ char RowBuf[16];
+ Uint32* keyPtr= (Uint32*) NdbDictionary::getValuePtr(record,
+ RowBuf,
+ 0);
+ Uint32* attrPtr= (Uint32*) NdbDictionary::getValuePtr(record,
+ RowBuf,
+ 1);
+
+ for (Uint32 i=keyOffset; i < (keyOffset + rowCount); i++)
+ {
+ *keyPtr= *attrPtr= i;
+ opts.optionsPresent |= NdbOperation::OperationOptions::OO_ANYVALUE;
+ opts.anyValue= anyValueOffset + i;
+ bool allowInterpreted=
+ (op == Update) ||
+ (op == Delete);
+
+ if (!allowInterpreted)
+ opts.optionsPresent &=
+ ~ (Uint64) NdbOperation::OperationOptions::OO_INTERPRETED;
+
+ switch (op) {
+ case Insert :
+ if (trans->insertTuple(record,
+ RowBuf,
+ NULL,
+ &opts,
+ sizeof(opts)) == NULL)
+ {
+ g_err << "Can't create operation : " <<
+ trans->getNdbError().code << endl;
+ return NDBT_FAILED;
+ }
+ break;
+ case Update :
+ if (trans->updateTuple(record,
+ RowBuf,
+ record,
+ RowBuf,
+ NULL,
+ &opts,
+ sizeof(opts)) == NULL)
+ {
+ g_err << "Can't create operation : " <<
+ trans->getNdbError().code << endl;
+ return NDBT_FAILED;
+ }
+ break;
+ case Write :
+ if (trans->writeTuple(record,
+ RowBuf,
+ record,
+ RowBuf,
+ NULL,
+ &opts,
+ sizeof(opts)) == NULL)
+ {
+ g_err << "Can't create operation : " <<
+ trans->getNdbError().code << endl;
+ return NDBT_FAILED;
+ }
+ break;
+ case Delete :
+ if (trans->deleteTuple(record,
+ RowBuf,
+ record,
+ NULL,
+ NULL,
+ &opts,
+ sizeof(opts)) == NULL)
+ {
+ g_err << "Can't create operation : " <<
+ trans->getNdbError().code << endl;
+ return NDBT_FAILED;
+ }
+ break;
+ default:
+ g_err << "Bad operation type : " << op << endl;
+ return NDBT_FAILED;
+ }
+ }
+
+ trans->execute(Commit);
+
+ if (trans->getNdbError().code != 0)
+ {
+ g_err << "Error executing operations :" <<
+ trans->getNdbError().code << endl;
+ return NDBT_FAILED;
+ }
+
+ trans->close();
+
+ return NDBT_OK;
+}
+
+int
+checkAnyValueInEvent(Ndb* pNdb,
+ NdbRecAttr* preKey,
+ NdbRecAttr* postKey,
+ NdbRecAttr* preAttr,
+ NdbRecAttr* postAttr,
+ Uint32 num,
+ Uint32 anyValueOffset,
+ bool checkPre)
+{
+ Uint32 received= 0;
+
+ while (received < num)
+ {
+ int pollRc;
+
+ if ((pollRc= pNdb->pollEvents(10000)) < 0)
+ {
+ g_err << "Error while polling for events : " <<
+ pNdb->getNdbError().code;
+ return NDBT_FAILED;
+ }
+
+ if (pollRc == 0)
+ {
+ printf("No event, waiting...\n");
+ continue;
+ }
+
+ NdbEventOperation* event;
+ while((event= pNdb->nextEvent()) != NULL)
+ {
+// printf("Event is %p of type %u\n",
+// event, event->getEventType());
+// printf("Got event, prekey is %u predata is %u \n",
+// preKey->u_32_value(),
+// preAttr->u_32_value());
+// printf(" postkey is %u postdata is %u anyvalue is %u\n",
+// postKey->u_32_value(),
+// postAttr->u_32_value(),
+// event->getAnyValue());
+
+ received ++;
+ Uint32 keyVal= (checkPre?
+ preKey->u_32_value() :
+ postKey->u_32_value());
+
+ if (event->getAnyValue() !=
+ (anyValueOffset + keyVal))
+ {
+ g_err << "Error : Got event, key is " <<
+ keyVal << " anyValue is " <<
+ event->getAnyValue() <<
+ " expected " << (anyValueOffset + keyVal)
+ << endl;
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ return NDBT_OK;
+}
+
+
+
+int
+runBug37672(NDBT_Context* ctx, NDBT_Step* step)
+{
+ /* InterpretedDelete and setAnyValue failed */
+ /* Let's create a boring, known table for this since
+ * we don't yet have Hugo tools for NdbRecord
+ */
+ BaseString name;
+ name.assfmt("TAB_TESTEVENT%d", rand() & 65535);
+ Ndb* pNdb= GETNDB(step);
+
+ const NdbDictionary::Table* tab= createBoringTable(name.c_str(), pNdb);
+
+ if (tab == NULL)
+ return NDBT_FAILED;
+
+ /* Create an event to listen to events on the table */
+ char eventName[1024];
+ sprintf(eventName,"%s_EVENT", tab->getName());
+
+ if (createEvent(pNdb, *tab, false, true) != 0)
+ return NDBT_FAILED;
+
+ /* Now create the event operation to retrieve the events */
+ NdbEventOperation* eventOp;
+ eventOp= pNdb->createEventOperation(eventName);
+
+ if (eventOp == NULL)
+ {
+ g_err << "Failed to create event operation :" <<
+ pNdb->getNdbError().code << endl;
+ return NDBT_FAILED;
+ }
+
+ NdbRecAttr* eventKeyData= eventOp->getValue("Key");
+ NdbRecAttr* eventOldKeyData= eventOp->getPreValue("Key");
+ NdbRecAttr* eventAttrData= eventOp->getValue("Attr");
+ NdbRecAttr* eventOldAttrData= eventOp->getPreValue("Attr");
+
+ if ((eventKeyData == NULL) || (eventAttrData == NULL))
+ {
+ g_err << "Failed to get NdbRecAttrs for events" << endl;
+ return NDBT_FAILED;
+ };
+
+ if (eventOp->execute() != 0)
+ {
+ g_err << "Failed to execute event operation :" <<
+ eventOp->getNdbError().code << endl;
+ return NDBT_FAILED;
+ }
+
+ /* Perform some operations on the table, and check
+ * that we get the correct AnyValues propagated
+ * through
+ */
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent= 0;
+
+ NdbInterpretedCode nonsenseProgram;
+
+ nonsenseProgram.load_const_u32(0, 0);
+ nonsenseProgram.interpret_exit_ok();
+
+ nonsenseProgram.finalise();
+
+ const Uint32 rowCount= 1500;
+ Uint32 keyOffset= 0;
+ Uint32 anyValueOffset= 100;
+
+ printf ("Testing AnyValue with no interpreted program\n");
+ for (int variants= 0; variants < 2; variants ++)
+ {
+ for (int op= Insert; op < EndOfOpTypes; op++)
+ {
+ printf(" Testing opType %d (ko=%d, ao=%d)...",
+ op, keyOffset, anyValueOffset);
+
+ if (executeOps(pNdb,
+ tab,
+ (OpTypes)op,
+ rowCount,
+ keyOffset,
+ anyValueOffset,
+ opts))
+ return NDBT_FAILED;
+
+ if (checkAnyValueInEvent(pNdb,
+ eventOldKeyData, eventKeyData,
+ eventOldAttrData, eventAttrData,
+ rowCount,
+ anyValueOffset,
+ false // always use postKey data
+ ) != NDBT_OK)
+ return NDBT_FAILED;
+ printf("ok\n");
+ };
+
+ printf("Testing AnyValue with interpreted program\n");
+ opts.optionsPresent|= NdbOperation::OperationOptions::OO_INTERPRETED;
+ opts.interpretedCode= &nonsenseProgram;
+ }
+
+ if (dropEventOperations(pNdb) != 0)
+ {
+ g_err << "Dropping event operations failed : " <<
+ pNdb->getNdbError().code << endl;
+ return NDBT_FAILED;
+ }
+
+ if (dropEvent(pNdb, tab->getName()) != 0)
+ {
+ g_err << "Dropping event failed : " <<
+ pNdb->getDictionary()->getNdbError().code << endl;
+ return NDBT_FAILED;
+ }
+
+ pNdb->getDictionary()->dropTable(tab->getName());
+
+ return NDBT_OK;
+}
+
+
NDBT_TESTSUITE(test_event);
TESTCASE("BasicEventOperation",
"Verify that we can listen to Events"
@@ -2879,6 +3228,10 @@ TESTCASE("Bug37442", "")
{
INITIALIZER(runBug37442);
}
+TESTCASE("Bug37672", "NdbRecord option OO_ANYVALUE causes interpreted delete to abort.")
+{
+ INITIALIZER(runBug37672);
+}
NDBT_TESTSUITE_END(test_event);
int main(int argc, const char** argv){
=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt 2008-07-01 15:01:50 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt 2008-08-07 12:32:06 +0000
@@ -629,7 +629,7 @@ args: -n CreateAndDropDuring T6 D1 D2
max-time: 1500
cmd: testDict
-args: -n CreateInvalidTables
+args: -n CreateInvalidTables T1
max-time: 1500
cmd: testDict
@@ -1197,6 +1197,16 @@ max-time: 500
cmd: testBasic
args: -n PkUpdate WIDE_MAXKEY_HUGO WIDE_MAXATTR_HUGO WIDE_MAXKEYMAXCOLS_HUGO WIDE_MINKEYMAXCOLS_HUGO
-
# EOF 2008-06-30
-# EOF
+
+max-time: 500
+cmd: test_event
+args -n bug37672 T1
+
+#EOF 2008-07-04
+
+max-time: 500
+cmd: testScanFilter
+args:
+
+#EOF 2008-07-09
=== modified file 'storage/ndb/tools/restore/Restore.cpp'
--- a/storage/ndb/tools/restore/Restore.cpp 2008-06-02 13:27:27 +0000
+++ b/storage/ndb/tools/restore/Restore.cpp 2008-08-05 14:34:39 +0000
@@ -1365,13 +1365,19 @@ bool RestoreDataIterator::readFragmentHe
if (Header.SectionType == BackupFormat::EMPTY_ENTRY)
{
void *tmp;
- buffer_get_ptr(&tmp, Header.SectionLength*4-8, 1);
+ if (Header.SectionLength < 2)
+ {
+ err << "getFragmentFooter:Error reading fragment footer" << endl;
+ return false;
+ }
+ if (Header.SectionLength > 2)
+ buffer_get_ptr(&tmp, Header.SectionLength*4-8, 1);
continue;
}
break;
}
/* read rest of header */
- if (buffer_read(((char*)&Header)+8, sizeof(Header)-8, 1) != 1)
+ if (buffer_read(((char*)&Header)+8, Header.SectionLength*4-8, 1) != 1)
{
ret = 0;
return false;
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-6.4 branch (msvensson:2700) | Magnus Svensson | 8 Aug |