Below is the list of changes that have just been committed into a local
5.1 repository of justin. When justin does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2008-04-24 17:05:52+08:00, justin@stripped +29 -0
WL#3085 optimize table (move fixpart)
1) use two scan to move fixpart and varpart
2) add a descending Tuple scan
3) add a MOVE operation (need to do NR/SR/Abrot stuff)
4) add more cases into ndb_optimize_table.test
(Commit again, just now use david's machine after he left,
and forgot to change configuration, :( )
mysql-test/suite/ndb/r/ndb_optimize_table.result@stripped, 2008-04-24 17:04:28+08:00,
justin@stripped +68 -0
WL#3085 optimize table (move fixpart)
update testcase results
mysql-test/suite/ndb/t/ndb_optimize_table.test@stripped, 2008-04-24 17:04:39+08:00,
justin@stripped +152 -0
WL#3085 optimize table (move fixpart)
add case for move varpart
add case for move fixpart
add case for tuple data consistency validate after optimize
sql/ha_ndbcluster.cc@stripped, 2008-04-24 17:04:45+08:00, justin@stripped +3 -0
WL#3085 optimize table (move fixpart)
add clean_up close() when initialize fail
storage/ndb/include/kernel/AttributeHeader.hpp@stripped, 2008-04-24 17:04:47+08:00,
justin@stripped +1 -0
WL#3085 optimize table (move fixpart)
add a pesudo column FIXPAGE_SIZE
storage/ndb/include/kernel/kernel_types.h@stripped, 2008-04-24 17:04:53+08:00,
justin@stripped +1 -0
WL#3085 optimize table (move fixpart)
add MOVE definiton
storage/ndb/include/ndbapi/NdbDictionary.hpp@stripped, 2008-04-24 17:04:54+08:00,
justin@stripped +1 -0
WL#3085 optimize table (move fixpart)
add pesudo column FIXPAGE_SIZE
storage/ndb/include/ndbapi/NdbOperation.hpp@stripped, 2008-04-24 17:04:54+08:00,
justin@stripped +19 -2
WL#3085 optimize table (move fixpart)
add optimization flag for MOVE operation
storage/ndb/include/ndbapi/NdbScanOperation.hpp@stripped, 2008-04-24 17:04:54+08:00,
justin@stripped +25 -0
WL#3085 optimize table (move fixpart)
add a moveCurrentTuple() interface
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp@stripped, 2008-04-24 17:05:00+08:00,
justin@stripped +23 -1
WL#3085 optimize table (move fixpart)
add MOVE and udpate operation rec for moving fixpart
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp@stripped, 2008-04-24 17:05:01+08:00,
justin@stripped +18 -0
WL#3085 optimize table (move fixpart)
update acc after moving fixpart
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp@stripped, 2008-04-24 17:05:02+08:00,
justin@stripped +8 -1
WL#3085 optimize table (move fixpart)
and MOVE
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp@stripped, 2008-04-24 17:05:03+08:00,
justin@stripped +1 -0
WL#3085 optimize table (move fixpart)
add MOVE
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp@stripped, 2008-04-24 17:05:04+08:00,
justin@stripped +17 -0
WL#3085 optimize table (move fixpart)
add move handler in tup block
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp@stripped, 2008-04-24 17:05:04+08:00,
justin@stripped +32 -1
WL#3085 optimize table (move fixpart)
update acckey and move fixpart
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp@stripped, 2008-04-24 17:05:06+08:00,
justin@stripped +181 -19
WL#3085 optimize table (move fixpart)
add move operation handler
storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp@stripped, 2008-04-24 17:05:17+08:00,
justin@stripped +10 -4
WL#3085 optimize table (move fixpart)
add read pseudo column for OPTIMIZE and FIXPAGE_SIZE
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp@stripped, 2008-04-24 17:05:22+08:00,
justin@stripped +26 -8
WL#3085 optimize table (move fixpart)
add descending tuple scan
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp@stripped, 2008-04-24 17:05:23+08:00,
justin@stripped +29 -0
WL#3085 optimize table (move fixpart)
add ordered index entry
storage/ndb/src/ndbapi/NdbBlob.cpp@stripped, 2008-04-24 17:05:24+08:00, justin@stripped
+2 -0
WL#3085 optimize table (move fixpart)
add MOVE
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp@stripped, 2008-04-24 17:05:25+08:00,
justin@stripped +504 -128
WL#3085 optimize table (move fixpart)
add two scans to move fixpart and varpart,
forward scan to find holes,
backward scan fill up these holes
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp@stripped, 2008-04-24 17:05:25+08:00,
justin@stripped +238 -2
WL#3085 optimize table (move fixpart)
add two scan definition
storage/ndb/src/ndbapi/NdbOperation.cpp@stripped, 2008-04-24 17:05:26+08:00,
justin@stripped +1 -0
WL#3085 optimize table (move fixpart)
add flag for MOVE
storage/ndb/src/ndbapi/NdbOperationDefine.cpp@stripped, 2008-04-24 17:05:33+08:00,
justin@stripped +7 -7
WL#3085 optimize table (move fixpart)
use new NdbRecord instead of old interface for OPTIMIZE
storage/ndb/src/ndbapi/NdbOperationExec.cpp@stripped, 2008-04-24 17:05:34+08:00,
justin@stripped +18 -1
WL#3085 optimize table (move fixpart)
use new NdbRecord to MOVE
storage/ndb/src/ndbapi/NdbOperationSearch.cpp@stripped, 2008-04-24 17:05:39+08:00,
justin@stripped +1 -1
WL#3085 optimize table (move fixpart)
add MOVE
storage/ndb/src/ndbapi/NdbScanOperation.cpp@stripped, 2008-04-24 17:05:40+08:00,
justin@stripped +38 -7
WL#3085 optimize table (move fixpart)
add MOVE
storage/ndb/src/ndbapi/NdbTransaction.cpp@stripped, 2008-04-24 17:05:40+08:00,
justin@stripped +1 -0
WL#3085 optimize table (move fixpart)
add MOVE
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp@stripped, 2008-04-24 17:05:41+08:00,
justin@stripped +4 -0
WL#3085 optimize table (move fixpart)
add pesudo column FIXPAGE_SIZE
storage/ndb/tools/select_all.cpp@stripped, 2008-04-24 17:05:41+08:00, justin@stripped +4
-0
WL#3085 optimize table (move fixpart)
diff -Nrup a/mysql-test/suite/ndb/r/ndb_optimize_table.result
b/mysql-test/suite/ndb/r/ndb_optimize_table.result
--- a/mysql-test/suite/ndb/r/ndb_optimize_table.result 2008-03-27 19:54:10 +08:00
+++ b/mysql-test/suite/ndb/r/ndb_optimize_table.result 2008-04-24 17:04:28 +08:00
@@ -29,7 +29,75 @@ Table Op Msg_type Msg_text
test.t1 optimize status OK
select data_length from information_schema.tables where table_name like 't1' into
@data_length2;
select 100*(@data_length-@data_length2)/@data_length into @opt_level;
+select @opt_level > 60;
+@opt_level > 60
+1
+drop table t1;
+CREATE TABLE t1 (
+pk INT NOT NULL AUTO_INCREMENT,
+attr1 VARCHAR(1024),
+attr2 VARBINARY(1024),
+attr3 BLOB,
+attr4 CHAR(255),
+PRIMARY KEY pk(pk) USING HASH,
+UNIQUE INDEX ui(pk, attr1, attr2)
+) ROW_FORMAT=DYNAMIC ENGINE=ndbcluster;
+set @val3 =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+set @old_ndb_autoincrement_prefetch_sz = @@session.ndb_autoincrement_prefetch_sz;
+set ndb_autoincrement_prefetch_sz = 256;
+set ndb_autoincrement_prefetch_sz = @old_ndb_autoincrement_prefetch_sz;
+select count(*) from t1;
+count(*)
+10000
+delete from t1 where pk < 1000 and pk %5 != 0;
+delete from t1 where pk < 2000 and pk %5 != 0;
+delete from t1 where pk < 3000 and pk %5 != 0;
+delete from t1 where pk < 4000 and pk %5 != 0;
+delete from t1 where pk < 5000 and pk %5 != 0;
+delete from t1 where pk < 6000 and pk %5 != 0;
+delete from t1 where pk < 7000 and pk %5 != 0;
+delete from t1 where pk < 8000 and pk %5 != 0;
+delete from t1 where pk < 9000 and pk %5 != 0;
+delete from t1 where pk < 10000 and pk %5 != 0;
+select count(*) from t1;
+count(*)
+2000
+select data_length from information_schema.tables where table_name like 't1' into
@data_length;
+set ndb_optimization_delay = 0;
+optimize table t1;
+Table Op Msg_type Msg_text
+test.t1 optimize status OK
+select data_length from information_schema.tables where table_name like 't1' into
@data_length2;
+select 100*(@data_length-@data_length2)/@data_length into @opt_level;
+select @opt_level > 50;
+@opt_level > 50
+1
+select count(*) from t1;
+count(*)
+2000
+truncate table t1;
+set @old_ndb_autoincrement_prefetch_sz = @@session.ndb_autoincrement_prefetch_sz;
+set ndb_autoincrement_prefetch_sz = 256;
+set @val = "This is MySQL Cluster.";
+set ndb_autoincrement_prefetch_sz = @old_ndb_autoincrement_prefetch_sz;
+select count(*) from t1;
+count(*)
+10000
+set @num=0;
+select count(*) from t1;
+count(*)
+10000
+select data_length from information_schema.tables where table_name like 't1' into
@data_length;
+set ndb_optimization_delay = 0;
+optimize table t1;
+Table Op Msg_type Msg_text
+test.t1 optimize status OK
+select data_length from information_schema.tables where table_name like 't1' into
@data_length2;
+select 100*(@data_length-@data_length2)/@data_length into @opt_level;
select @opt_level > 30;
@opt_level > 30
1
+select count(*) from t1;
+count(*)
+10000
drop table t1;
diff -Nrup a/mysql-test/suite/ndb/t/ndb_optimize_table.test
b/mysql-test/suite/ndb/t/ndb_optimize_table.test
--- a/mysql-test/suite/ndb/t/ndb_optimize_table.test 2008-03-27 19:54:10 +08:00
+++ b/mysql-test/suite/ndb/t/ndb_optimize_table.test 2008-04-24 17:04:39 +08:00
@@ -5,6 +5,11 @@
DROP TABLE IF EXISTS t1;
--enable_warnings
+###################################################################################
+# Case 1: test for page release #
+# if a page become empty, NDB kernel will reclaim the page automatically #
+###################################################################################
+
set @val =
"0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
set @val2 =
0x0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF;
@@ -46,6 +51,9 @@ delete from t1 where pk < 6000 order by
delete from t1 where pk < 7000 order by pk;
delete from t1 where pk < 7500 order by pk;
+#
+# compare memory utilization before and after optimize
+#
select data_length from information_schema.tables where table_name like 't1' into
@data_length;
set ndb_optimization_delay = 0;
@@ -56,6 +64,150 @@ select data_length from information_sche
select 100*(@data_length-@data_length2)/@data_length into @opt_level;
#select @data_length,@data_length2,@opt_level;
+select @opt_level > 60;
+
+drop table t1;
+
+######################################################################
+# Case 2: test for moving both varpart and fixpart to fill in holes #
+# basically these holes are produced by deletions. #
+# and verify data consistency after optimize #
+######################################################################
+CREATE TABLE t1 (
+ pk INT NOT NULL AUTO_INCREMENT,
+ attr1 VARCHAR(1024),
+ attr2 VARBINARY(1024),
+ attr3 BLOB,
+ attr4 CHAR(255),
+ PRIMARY KEY pk(pk) USING HASH,
+ UNIQUE INDEX ui(pk, attr1, attr2)
+) ROW_FORMAT=DYNAMIC ENGINE=ndbcluster;
+
+set @val3 =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+#
+# insert records into table
+#
+set @old_ndb_autoincrement_prefetch_sz = @@session.ndb_autoincrement_prefetch_sz;
+set ndb_autoincrement_prefetch_sz = 256;
+let $1=1000;
+disable_query_log;
+while ($1)
+{
+ eval insert into t1(attr1, attr2, attr3, attr4) values (@val3, @val2, @val3,
@val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3,
@val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3,
@val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3,
@val);
+ dec $1;
+}
+enable_query_log;
+set ndb_autoincrement_prefetch_sz = @old_ndb_autoincrement_prefetch_sz;
+
+select count(*) from t1;
+#
+# delete some rows and let these holes disperse in each page
+#
+delete from t1 where pk < 1000 and pk %5 != 0;
+delete from t1 where pk < 2000 and pk %5 != 0;
+delete from t1 where pk < 3000 and pk %5 != 0;
+delete from t1 where pk < 4000 and pk %5 != 0;
+delete from t1 where pk < 5000 and pk %5 != 0;
+delete from t1 where pk < 6000 and pk %5 != 0;
+delete from t1 where pk < 7000 and pk %5 != 0;
+delete from t1 where pk < 8000 and pk %5 != 0;
+delete from t1 where pk < 9000 and pk %5 != 0;
+delete from t1 where pk < 10000 and pk %5 != 0;
+
+select count(*) from t1;
+
+#
+# compare memory utilization before and after optimize
+#
+select data_length from information_schema.tables where table_name like 't1' into
@data_length;
+
+# save original tuple data before moving
+--exec $NDB_TOOLS_DIR/ndb_select_all --no-defaults
--ndb-connectstring="localhost:$NDBCLUSTER_PORT" -d test t1 -o ui >
$MYSQLTEST_VARDIR/tmp/before_ndb_optimize_table.dat
+
+set ndb_optimization_delay = 0;
+optimize table t1;
+
+# save new tuple data after moving
+--exec $NDB_TOOLS_DIR/ndb_select_all --no-defaults
--ndb-connectstring="localhost:$NDBCLUSTER_PORT" -d test t1 -o ui >
$MYSQLTEST_VARDIR/tmp/after_ndb_optimize_table.dat
+
+# compare the tuple data before and after optimize table
+diff_files $MYSQLTEST_VARDIR/tmp/before_ndb_optimize_table.dat
$MYSQLTEST_VARDIR/tmp/after_ndb_optimize_table.dat;
+
+select data_length from information_schema.tables where table_name like 't1' into
@data_length2;
+
+select 100*(@data_length-@data_length2)/@data_length into @opt_level;
+
+select @opt_level > 50;
+
+select count(*) from t1;
+
+##########################################################
+# Case 3: test for moving only varpart to fill in holes #
+# basically these holes are produced by updates #
+# and verify data consistency after optimize #
+##########################################################
+truncate table t1;
+
+#
+# insert records into table
+#
+set @old_ndb_autoincrement_prefetch_sz = @@session.ndb_autoincrement_prefetch_sz;
+set ndb_autoincrement_prefetch_sz = 256;
+set @val = "This is MySQL Cluster.";
+let $1=1000;
+disable_query_log;
+while ($1)
+{
+ eval insert into t1(attr1, attr2, attr3, attr4) values (@val3, @val2, @val3,
@val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3,
@val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3,
@val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3, @val),(@val3, @val2, @val3,
@val);
+ dec $1;
+}
+enable_query_log;
+set ndb_autoincrement_prefetch_sz = @old_ndb_autoincrement_prefetch_sz;
+select count(*) from t1;
+
+#
+# update column from big size into small size
+#
+let $count=1000;
+set @num=0;
+disable_query_log;
+while ($count)
+{
+ eval update t1 set attr1='MySQL$count',attr2=@num,attr3='Blob$count',attr4='Char$count'
where pk <= @num + 20 and pk > @num;
+ set @num = @num + 20;
+ dec $count;
+}
+enable_query_log;
+
+# save original tuple data before moving
+--exec $NDB_TOOLS_DIR/ndb_select_all --no-defaults
--ndb-connectstring="localhost:$NDBCLUSTER_PORT" -d test t1 -o ui >
$MYSQLTEST_VARDIR/tmp/before_ndb_optimize_table.dat
+
+select count(*) from t1;
+
+#
+# compare memory utilization before and after optimize
+#
+select data_length from information_schema.tables where table_name like 't1' into
@data_length;
+
+set ndb_optimization_delay = 0;
+optimize table t1;
+
+select data_length from information_schema.tables where table_name like 't1' into
@data_length2;
+
+select 100*(@data_length-@data_length2)/@data_length into @opt_level;
+
select @opt_level > 30;
+
+# save new tuple data after moving
+--exec $NDB_TOOLS_DIR/ndb_select_all --no-defaults
--ndb-connectstring="localhost:$NDBCLUSTER_PORT" -d test t1 -o ui >
$MYSQLTEST_VARDIR/tmp/after_ndb_optimize_table.dat
+
+select count(*) from t1;
+
+# compare the tuple data before and after optimize table
+diff_files $MYSQLTEST_VARDIR/tmp/before_ndb_optimize_table.dat
$MYSQLTEST_VARDIR/tmp/after_ndb_optimize_table.dat;
+
+--exec rm -f $MYSQLTEST_VARDIR/tmp/before_ndb_optimize_table.dat
+--exec rm -f $MYSQLTEST_VARDIR/tmp/after_ndb_optimize_table.dat
drop table t1;
diff -Nrup a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
--- a/sql/ha_ndbcluster.cc 2008-04-11 00:49:48 +08:00
+++ b/sql/ha_ndbcluster.cc 2008-04-24 17:04:45 +08:00
@@ -7616,6 +7616,7 @@ int ha_ndbcluster::ndb_optimize_table(TH
{
DBUG_PRINT("info",
("Optimze table %s returned %d", m_tabname, error));
+ th.close();
ERR_RETURN(ndb->getNdbError());
}
while((result= th.next()) == 1)
@@ -7646,6 +7647,7 @@ int ha_ndbcluster::ndb_optimize_table(TH
DBUG_PRINT("info",
("Optimze index %s returned %d",
index->getName(), error));
+ ih.close();
ERR_RETURN(ndb->getNdbError());
}
@@ -7669,6 +7671,7 @@ int ha_ndbcluster::ndb_optimize_table(TH
DBUG_PRINT("info",
("Optimze unique index %s returned %d",
unique_index->getName(), error));
+ ih.close();
ERR_RETURN(ndb->getNdbError());
}
while((result= ih.next()) == 1)
diff -Nrup a/storage/ndb/include/kernel/AttributeHeader.hpp
b/storage/ndb/include/kernel/AttributeHeader.hpp
--- a/storage/ndb/include/kernel/AttributeHeader.hpp 2008-03-18 15:12:37 +08:00
+++ b/storage/ndb/include/kernel/AttributeHeader.hpp 2008-04-24 17:04:47 +08:00
@@ -50,6 +50,7 @@ public:
STATIC_CONST( COPY_ROWID = 0xFFF1 );
STATIC_CONST( READ_ALL = 0xFFF0 );
STATIC_CONST( READ_LCP = 0xFFEF );
+ STATIC_CONST( FIXPAGE_SIZE = 0xFFEE );
/**
* Optimize pseudo column and optimization options
diff -Nrup a/storage/ndb/include/kernel/kernel_types.h
b/storage/ndb/include/kernel/kernel_types.h
--- a/storage/ndb/include/kernel/kernel_types.h 2006-12-24 03:20:03 +08:00
+++ b/storage/ndb/include/kernel/kernel_types.h 2008-04-24 17:04:53 +08:00
@@ -35,6 +35,7 @@ enum Operation_t {
#if 0
,ZREAD_CONSISTENT = 6
#endif
+ ,ZMOVE = 7
};
/**
diff -Nrup a/storage/ndb/include/ndbapi/NdbDictionary.hpp
b/storage/ndb/include/ndbapi/NdbDictionary.hpp
--- a/storage/ndb/include/ndbapi/NdbDictionary.hpp 2008-04-08 18:44:16 +08:00
+++ b/storage/ndb/include/ndbapi/NdbDictionary.hpp 2008-04-24 17:04:54 +08:00
@@ -572,6 +572,7 @@ public:
static const Column * ANY_VALUE;
static const Column * COPY_ROWID;
static const Column * OPTIMIZE;
+ static const Column * FIXPAGE_SIZE;
int getSizeInBytes() const;
diff -Nrup a/storage/ndb/include/ndbapi/NdbOperation.hpp
b/storage/ndb/include/ndbapi/NdbOperation.hpp
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp 2008-02-20 20:30:12 +08:00
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp 2008-04-24 17:04:54 +08:00
@@ -419,7 +419,6 @@ public:
int setValue(const char* anAttrName, double aValue);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
int setAnyValue(Uint32 aValue);
- int setOptimize(Uint32 options);
#endif
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
@@ -869,6 +868,7 @@ public:
DeleteRequest = 3, ///< Delete Operation
WriteRequest = 4, ///< Write Operation
ReadExclusive = 5, ///< Read exclusive
+ MoveRequest = 7, ///< Move operation, equal to ZMOVE in
kernel_types.h
OpenScanRequest, ///< Scan Operation
OpenRangeScanRequest, ///< Range scan operation
NotDefined2, ///< Internal for debugging
@@ -986,7 +986,8 @@ public:
OO_PARTITION_ID = 0x08,
OO_INTERPRETED = 0x10,
OO_ANYVALUE = 0x20,
- OO_CUSTOMDATA = 0x40 };
+ OO_CUSTOMDATA = 0x40,
+ OO_OPTIMIZE = 0x80};
/* An operation-specific abort option.
* Only necessary if the default abortoption behaviour
@@ -1015,6 +1016,19 @@ public:
/* customData ptr for this operation */
void * customData;
+
+ /* 64 bits table optimization flags
+ * high 32bits stand for page no. of ROWID;
+ * in low 32bits, high 16bits stand for page index of ROWID,
+ * low 16bits stand for flags of optimizing table;
+ * when optimization flags stand for VARPART, ROWID bits are applicable;
+ * 0123456701234567012345670123456701234567012345670123456701234567
+ * ppppppppppppppppppppppppppppppppiiiiiiiiiiiiiiiiffffffffffffffff
+ * p: page no. of ROWID when move fixpart to
+ * i: page idx of ROWID when move fixpart to
+ * f: flags of optimization, such as optimze varpart, optimze fixpart, etc.
+ */
+ Uint64 optimizeFlags;
};
@@ -1418,6 +1432,9 @@ protected:
Uint32 m_numExtraSetValues;
Uint32 m_any_value; // Valid if m_use_any_value!=0
+
+ bool m_use_optimize_flags;
+ Uint64 m_optimize_flags;
// Blobs in this operation
NdbBlob* theBlobList;
diff -Nrup a/storage/ndb/include/ndbapi/NdbScanOperation.hpp
b/storage/ndb/include/ndbapi/NdbScanOperation.hpp
--- a/storage/ndb/include/ndbapi/NdbScanOperation.hpp 2008-04-05 00:18:05 +08:00
+++ b/storage/ndb/include/ndbapi/NdbScanOperation.hpp 2008-04-24 17:04:54 +08:00
@@ -55,6 +55,10 @@ public:
*/
SF_OrderBy = (1 << 24),
/* Index scan in descending order, instead of default ascending. */
+ /*
+ Now we also use SF_TupScan OR SF_Descending flags to implement
+ a reversed order of logcial storage location of tuple
+ */
SF_Descending = (2 << 24),
/*
Enable @ref get_range_no (index scan only).
@@ -364,6 +368,27 @@ public:
const unsigned char *mask= 0,
const NdbOperation::OperationOptions *opts = 0,
Uint32 sizeOfOptions = 0);
+
+ /*
+ * Moving current tuple include two aspects:
+ * 1) move varpart:
+ * let kernel decide whether the varpart of tuple need to be moved and where to move
+ * 2) move fixpart:
+ * we specify the rowid of destination where move the fixpart of tuple to
+ *
+ * Parameters:
+ * Uint32 optimize_option:
+ * == 1, to move varpart
+ * == 2, to move fixpart
+ * == 3, to move both varpart and fixpart
+ * Uint32 page_no, Uint32 page_idx:
+ * the rowid of destination tuple where we want move current tuple to
+ */
+ const NdbOperation *moveCurrentTuple(NdbTransaction *takeOverTrans,
+ const NdbRecord *attr_rec,
+ const Uint32 optimize_option,
+ const Uint32 page_no,
+ const Uint32 page_idx);
/* Delete the current tuple. NdbRecord version.
* The tuple can be read before being deleted. Specify the columns to read
diff -Nrup a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2008-02-03 21:16:33 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2008-04-24 17:05:00 +08:00
@@ -1099,6 +1099,7 @@ void Dbacc::execACCKEYREQ(Signal* signal
switch (op) {
case ZREAD:
case ZUPDATE:
+ case ZMOVE:
case ZDELETE:
case ZWRITE:
case ZSCAN_OP:
@@ -1145,6 +1146,22 @@ void Dbacc::execACCKEYREQ(Signal* signal
return;
} else {
jam();
+ if (op == ZMOVE) {
+ jam();
+ //TODO, for fixpart moving
+ /**
+ * We need update element data since we need to update
+ * acc key after moving fixpart of optimizing table,
+ * and this should not influnce lock issue.
+ * On Non-primary fragments, need to update forcely,
+ * because normally cannot get lock in commit phase.
+ */
+ Uint32 eh = gePageptr.p->word32[tgeElementptr];
+ operationRecPtr.p->elementPage = gePageptr.i;
+ operationRecPtr.p->elementContainer = tgeContainerptr;
+ operationRecPtr.p->elementPointer = tgeElementptr;
+ operationRecPtr.p->elementIsforward = tgeForward;
+ }
accIsLockedLab(signal, lockOwnerPtr);
return;
}//if
@@ -1174,6 +1191,7 @@ void Dbacc::execACCKEYREQ(Signal* signal
break;
case ZREAD:
case ZUPDATE:
+ case ZMOVE:
case ZDELETE:
case ZSCAN_OP:
jam();
@@ -1812,6 +1830,7 @@ operator<<(NdbOut & out, Dbacc::Operatio
case ZREAD: out << "READ "; read = true; break;
case ZINSERT: out << "INSERT "; break;
case ZUPDATE: out << "UPDATE "; break;
+ case ZMOVE: out << "MOVE "; break;
case ZDELETE: out << "DELETE "; break;
case ZWRITE: out << "WRITE "; break;
case ZSCAN_OP: out << "SCAN "; read = true; break;
@@ -2273,6 +2292,7 @@ void Dbacc::execACCMINUPDATE(Signal* sig
operationRecPtr.i = signal->theData[0];
tlocalkey1 = signal->theData[1];
tlocalkey2 = signal->theData[2];
+ const bool update_in_run = signal->theData[3];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
Uint32 opbits = operationRecPtr.p->m_op_bits;
fragrecptr.i = operationRecPtr.p->fragptr;
@@ -2280,7 +2300,9 @@ void Dbacc::execACCMINUPDATE(Signal* sig
tulkLocalPtr = operationRecPtr.p->elementPointer +
operationRecPtr.p->elementIsforward;
- if ((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_RUNNING)
+ if (update_in_run ?
+ (opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_RUNNING :
+ (opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_EXECUTED)
{
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
ptrCheckGuard(ulkPageidptr, cpagesize, page8);
diff -Nrup a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2007-12-18 21:19:41 +08:00
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-04-24 17:05:01 +08:00
@@ -2908,6 +2908,7 @@ public:
bool is_same_trans(Uint32 opId, Uint32 trid1, Uint32 trid2);
void get_op_info(Uint32 opId, Uint32 *hash, Uint32* gci_hi, Uint32* gci_lo);
void accminupdate(Signal*, Uint32 opPtrI, const Local_key*);
+ void move_accminupdate(Signal*, Uint32 opPtrI, const Local_key*);
/**
*
@@ -3027,11 +3028,28 @@ Dblqh::accminupdate(Signal* signal, Uint
ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
signal->theData[0] = regTcPtr.p->accConnectrec;
signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS |
key->m_page_idx;
+ signal->theData[3] = true;
c_acc->execACCMINUPDATE(signal);
if (ERROR_INSERTED(5712) || ERROR_INSERTED(5713))
ndbout << " LK: " << *key;
regTcPtr.p->m_row_id = *key;
+}
+
+/*
+ * update acckey in commit phase only for move operation
+ */
+inline
+void
+Dblqh::move_accminupdate(Signal* signal, Uint32 opId, const Local_key* key)
+{
+ TcConnectionrecPtr regTcPtr;
+ regTcPtr.i= opId;
+ ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
+ signal->theData[0] = regTcPtr.p->accConnectrec;
+ signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS |
key->m_page_idx;
+ signal->theData[3] = false;
+ c_acc->execACCMINUPDATE(signal);
}
inline
diff -Nrup a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-04-23 22:32:26 +08:00
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-04-24 17:05:02 +08:00
@@ -128,6 +128,7 @@ operator<<(NdbOut& out, Operation_t op)
case ZREAD_EX: out << "READ-EX"; break;
case ZINSERT: out << "INSERT"; break;
case ZUPDATE: out << "UPDATE"; break;
+ case ZMOVE: out << "MOVE"; break;
case ZDELETE: out << "DELETE"; break;
case ZWRITE: out << "WRITE"; break;
}
@@ -3730,7 +3731,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
regTcPtr->m_disk_table = tabptr.p->m_disk_table;
if(refToBlock(signal->senderBlockRef()) == RESTORE)
regTcPtr->m_disk_table &= !LqhKeyReq::getNoDiskFlag(Treqinfo);
- else if(op == ZREAD || op == ZREAD_EX || op == ZUPDATE)
+ else if(op == ZREAD || op == ZREAD_EX || op == ZUPDATE || op == ZMOVE)
regTcPtr->m_disk_table &= !LqhKeyReq::getNoDiskFlag(Treqinfo);
tabptr.p->usageCount++;
@@ -3982,6 +3983,7 @@ void Dblqh::prepareContinueAfterBlockedL
switch (regTcPtr->operation) {
case ZREAD: TRACENR("READ"); break;
case ZUPDATE: TRACENR("UPDATE"); break;
+ case ZMOVE: TRACENR("MOVE"); break;
case ZWRITE: TRACENR("WRITE"); break;
case ZINSERT: TRACENR("INSERT"); break;
case ZDELETE: TRACENR("DELETE"); break;
@@ -6550,6 +6552,7 @@ void Dblqh::commitContinueAfterBlockedLa
switch (regTcPtr.p->operation) {
case ZREAD: TRACENR("READ"); break;
case ZUPDATE: TRACENR("UPDATE"); break;
+ case ZMOVE: TRACENR("MOVE"); break;
case ZWRITE: TRACENR("WRITE"); break;
case ZINSERT: TRACENR("INSERT"); break;
case ZDELETE: TRACENR("DELETE"); break;
@@ -7043,6 +7046,7 @@ void Dblqh::execACCKEYREF(Signal* signal
switch (tcPtr->operation) {
case ZREAD: TRACENR("READ"); break;
case ZUPDATE: TRACENR("UPDATE"); break;
+ case ZMOVE: TRACENR("MOVE"); break;
case ZWRITE: TRACENR("WRITE"); break;
case ZINSERT: TRACENR("INSERT"); break;
case ZDELETE: TRACENR("DELETE"); break;
@@ -15954,6 +15958,7 @@ void Dblqh::logLqhkeyrefLab(Signal* sign
Uint32 result = returnExecLog(signal);
switch (tcConnectptr.p->operation) {
case ZUPDATE:
+ case ZMOVE:
case ZDELETE:
jam();
if (unlikely(terrorCode != ZNO_TUPLE_FOUND))
@@ -15993,6 +15998,7 @@ error:
" Failed op (%s) during REDO table: %d fragment: %d err: %d",
tcConnectptr.p->operation == ZINSERT ? "INSERT" :
tcConnectptr.p->operation == ZUPDATE ? "UPDATE" :
+ tcConnectptr.p->operation == ZMOVE ? "MOVE" :
tcConnectptr.p->operation == ZDELETE ? "DELETE" :
tcConnectptr.p->operation == ZWRITE ? "WRITE" : "<unknown>",
tcConnectptr.p->tableref,
@@ -19063,6 +19069,7 @@ Dblqh::match_and_print(Signal* signal, P
break;
case ZINSERT: op = "INSERT"; break;
case ZUPDATE: op = "UPDATE"; break;
+ case ZMOVE : op = "MOVE" ; break;
case ZDELETE: op = "DELETE"; break;
case ZWRITE: op = "WRITE"; break;
}
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-04-23 22:32:26 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-04-24 17:05:03 +08:00
@@ -2931,6 +2931,7 @@ void Dbtc::execTCKEYREQ(Signal* signal)
c_counters.cwriteCount = TwriteCount + 1;
switch (TOperationType) {
case ZUPDATE:
+ case ZMOVE:
case ZINSERT:
case ZDELETE:
case ZWRITE:
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2008-04-07 13:44:28 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2008-04-24 17:05:04 +08:00
@@ -516,6 +516,7 @@ typedef Ptr<Fragoperrec> FragoperrecPtr;
SCAN_DD = 0x01, // scan disk pages
SCAN_VS = 0x02, // page format is var size
SCAN_LCP = 0x04, // LCP mem page scan
+ SCAN_DESCEND = 0x08, // scan in descending order for mem tuple
SCAN_LOCK_SH = 0x10, // lock mode shared
SCAN_LOCK_EX = 0x20, // lock mode exclusive
SCAN_LOCK_WAIT = 0x40, // lock wait
@@ -795,6 +796,7 @@ struct Operationrec {
*/
Local_key m_tuple_location;
Local_key m_copy_tuple_location;
+ Local_key m_move_tuple_location;
/*
* We keep the record linked to the operation record in LQH.
@@ -1572,6 +1574,7 @@ struct KeyReqStruct {
PagePtr m_disk_page_ptr; //
Local_key m_row_id;
Uint32 optimize_options;
+ Local_key optimize_row_id; /* move fixpart to when optimizing table */
bool dirty_op;
bool interpreted_exec;
@@ -1990,6 +1993,15 @@ private:
//------------------------------------------------------------------
//------------------------------------------------------------------
+ int handleMoveReq(Signal* signal,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
+ KeyReqStruct* req_struct,
+ bool disk);
+
+//------------------------------------------------------------------
+//------------------------------------------------------------------
int updateStartLab(Signal* signal,
Operationrec* regOperPtr,
Fragrecord* regFragPtr,
@@ -2599,6 +2611,11 @@ private:
Operationrec* regOperPtr,
Fragrecord* regFragPtr,
Tablerec* regTabPtr);
+
+ int executeTuxMoveTriggers(Signal* signal,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr);
int executeTuxDeleteTriggers(Signal* signal,
Operationrec* regOperPtr,
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2008-03-25 23:47:04 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2008-04-24 17:05:04 +08:00
@@ -206,6 +206,36 @@ Dbtup::commit_operation(Signal* signal,
{
ndbassert(regOperPtr->op_struct.op_type != ZDELETE);
+ if(regOperPtr->op_struct.op_type == ZMOVE &&
+ !regOperPtr->m_move_tuple_location.isNull()) {
+ Uint32 fixsize= regTabPtr->m_offsets[MM].m_fix_header_size;
+ PagePtr tmp_page;
+ Local_key tmp_key;
+ Tuple_header* move_tuple_ptr = (Tuple_header*)
+ get_ptr(&tmp_page, ®OperPtr->m_move_tuple_location, regTabPtr);
+ memcpy(move_tuple_ptr, tuple_ptr, 4*fixsize);
+ tmp_key.m_page_no = tmp_page.p->frag_page_id;
+ tmp_key.m_page_idx = regOperPtr->m_move_tuple_location.m_page_idx;
+ /**
+ * update acc key after we already finished moving fixpart
+ */
+ c_lqh->move_accminupdate(signal,
+ regOperPtr->userpointer,
+ &tmp_key);
+ /**
+ * delete and release fixpart of the original tuple
+ */
+ free_fix_rec(regFragPtr, regTabPtr,
+ ®OperPtr->m_tuple_location, (Fix_page*)pagePtr.p);
+ regOperPtr->m_tuple_location = regOperPtr->m_move_tuple_location;
+ regOperPtr->m_move_tuple_location.setNull();
+
+ return;
+ }
+
+ if(regOperPtr->op_struct.op_type == ZMOVE)
+ return;
+
Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
Uint32 save= tuple_ptr->m_operation_ptr_i;
Uint32 bits= tuple_ptr->m_header_bits;
@@ -754,7 +784,8 @@ Dbtup::set_commit_change_mask_info(const
Uint32 masklen = (regTabPtr->m_no_of_attributes + 31) >> 5;
if (regOperPtr->m_copy_tuple_location.isNull())
{
- ndbassert(regOperPtr->op_struct.op_type == ZDELETE);
+ ndbassert(regOperPtr->op_struct.op_type == ZDELETE ||
+ regOperPtr->op_struct.op_type == ZMOVE);
req_struct->changeMask.set();
}
else
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2008-03-28 00:04:19 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2008-04-24 17:05:06 +08:00
@@ -642,6 +642,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal
regOperPtr->storedProcedureId= Rstoredid;
regOperPtr->m_copy_tuple_location.setNull();
+ regOperPtr->m_move_tuple_location.setNull();
regOperPtr->tupVersion= ZNIL;
sig1= tupKeyReq->savePointId;
@@ -846,6 +847,49 @@ void Dbtup::execTUPKEYREQ(Signal* signal
sendTUPKEYCONF(signal, &req_struct, regOperPtr);
return;
}
+ else if(Roptype == ZMOVE)
+ {
+ jam();
+ if (handleMoveReq(signal, regOperPtr, regFragPtr, regTabPtr,
+ &req_struct, disk_page != RNIL) == -1) {
+ return;
+ }
+ /*
+ * if move fixpart, then should add a new entry of ordered index
+ */
+ if (req_struct.optimize_options & AttributeHeader::OPTIMIZE_MOVE_FIXPART
&&
+ !regOperPtr->m_move_tuple_location.isNull()) {
+ /*
+ * first copy tuple from old location to new,
+ * otherwise we cannot update ordered index
+ */
+ const Uint32 fixsize= regTabPtr->m_offsets[MM].m_fix_header_size;
+ PagePtr tmp_page, tmp_page1;
+ Tuple_header* move_tuple_ptr = (Tuple_header*)
+ get_ptr(&tmp_page1, ®OperPtr->m_move_tuple_location, regTabPtr);
+ memcpy(move_tuple_ptr, req_struct.m_tuple_ptr, 4*fixsize);
+
+ if (!regTabPtr->tuxCustomTriggers.isEmpty()) {
+ jam();
+ if (executeTuxMoveTriggers(signal,
+ regOperPtr,
+ regFragPtr,
+ regTabPtr) != 0) {
+ jam();
+ /*
+ * See insert case.
+ */
+ signal->theData[0] = operPtr.i;
+ do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
+ tupkeyErrorLab(signal);
+ return;
+ }
+ }
+ }
+
+ sendTUPKEYCONF(signal, &req_struct, regOperPtr);
+ return;
+ }
else
{
ndbrequire(false); // Invalid op type
@@ -1057,8 +1101,6 @@ int Dbtup::handleUpdateReq(Signal* signa
tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
operPtrP->tupVersion= tup_version;
- req_struct->optimize_options = 0;
-
if (!req_struct->interpreted_exec) {
jam();
int retValue = updateAttributes(req_struct,
@@ -1076,23 +1118,6 @@ int Dbtup::handleUpdateReq(Signal* signa
change_mask_ptr,
req_struct->changeMask.rep.data);
- switch (req_struct->optimize_options) {
- case AttributeHeader::OPTIMIZE_MOVE_VARPART:
- /**
- * optimize varpart of tuple, move varpart of tuple from
- * big-free-size page list into small-free-size page list
- */
- if(base->m_header_bits & Tuple_header::VAR_PART)
- optimize_var_part(req_struct, base, operPtrP,
- regFragPtr, regTabPtr);
- break;
- case AttributeHeader::OPTIMIZE_MOVE_FIXPART:
- //TODO: move fix part of tuple
- break;
- default:
- break;
- }
-
if (regTabPtr->need_shrink())
{
shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
@@ -1115,6 +1140,143 @@ int Dbtup::handleUpdateReq(Signal* signa
error:
tupkeyErrorLab(signal);
+ return -1;
+}
+
+/* ---------------------------------------------------------------- */
+/* ----------------------------- MOVE ----------------------------- */
+/* ---------------------------------------------------------------- */
+int Dbtup::handleMoveReq(Signal* signal,
+ Operationrec* operPtrP,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr,
+ KeyReqStruct* req_struct,
+ bool disk)
+{
+ /*
+ * for MOVE operation, we suppose:
+ * 1. there should not be multiple MOVE operations of a tuple in a transaction
+ * 2. MOVE need not support interpret
+ * 3. now donot support trigger before/after MOVE
+ * 3. to make things simplicity enough,
+ * firstly, we cannot support MOVE operation mixed with other operations
+ * (select, update, insert, delete) in a transaction
+ * 4. TODO:
+ * need to support NR/SR and Abort
+ */
+ Tuple_header *org = req_struct->m_tuple_ptr;
+ disk = disk ||
+ (org->m_header_bits & Tuple_header::DISK_INLINE);
+ /* now we only support moving tuple based on memory */
+ if (unlikely(disk))
+ return 0;
+
+ /**
+ * Check consistency before move
+ */
+ if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
+ (calculateChecksum(org, regTabPtr) != 0))
+ {
+ terrorCode= ZTUPLE_CORRUPTED_ERROR;
+ goto error;
+ }
+
+ operPtrP->tupVersion= org->get_tuple_version();
+
+ req_struct->optimize_options = 0;
+ req_struct->optimize_row_id.setNull();
+
+ if (!req_struct->interpreted_exec) {
+ jam();
+ int retValue = updateAttributes(req_struct,
+ &cinBuffer[0],
+ req_struct->attrinfo_len);
+ if (unlikely(retValue == -1))
+ goto error;
+ } else {
+ ndbrequire(false);
+ goto error;
+ }
+
+ if (req_struct->optimize_options) {
+ if (req_struct->optimize_options & AttributeHeader::OPTIMIZE_MOVE_VARPART) {
+ /**
+ * optimize varpart of tuple, move varpart of tuple from
+ * big-free-size page list into small-free-size page list
+ */
+ if(org->m_header_bits & Tuple_header::VAR_PART) {
+ optimize_var_part(req_struct, org, operPtrP,
+ regFragPtr, regTabPtr);
+ }
+ }
+ if (req_struct->optimize_options & AttributeHeader::OPTIMIZE_MOVE_FIXPART) {
+ Uint32 move_page_no = req_struct->optimize_row_id.m_page_no;
+ Uint32 move_page_idx = req_struct->optimize_row_id.m_page_idx;
+
+ /*
+ * before move, we should check for the move destination
+ * main two reasons:
+ * 1. the destination is already occupied by a new insertion
+ * 2. the destination page is already released and is invalid
+ *
+ * we just check whether destination rowid is valid
+ * if optimize_row_id is invalid, just ignore it;
+ */
+ Uint32 real_pid = RNIL;
+ if ((real_pid = getRealpidCheck(regFragPtr, move_page_no))
+ == RNIL) {
+ /*
+ * if page isnot valid, we ignore it, and not to move fixpart
+ */
+ operPtrP->m_move_tuple_location.setNull();
+ } else {
+ Ptr<Page> pagePtr;
+ c_page_pool.getPtr(pagePtr, real_pid); /* get real page pointer */
+ Fix_page* pageP = (Fix_page*)pagePtr.p;
+
+ const Uint32 rec_size = regTabPtr->m_offsets[MM].m_fix_header_size;
+ /*
+ * validate the record is free, if it's free, then allocate it
+ */
+ if (pageP->m_page_header.m_page_type == File_formats::PT_Tup_fixsize_page
&&
+ pageP->page_state == ZTH_MM_FREE &&
+ pageP->frag_page_id == move_page_no &&
+ pageP->physical_page_id == real_pid &&
+ pageP->free_space > 0 &&
+ pageP->free_space < (Fix_page::DATA_WORDS)/rec_size ) {
+ if (move_page_idx % rec_size == 0 &&
+ move_page_idx + 1 < Fix_page::DATA_WORDS &&
+ pageP->m_data[move_page_idx + 1] == Fix_page::FREE_RECORD &&
+ pageP->alloc_record(move_page_idx) == move_page_idx) {
+ /*
+ * use m_move_tuple_location to save destitation rowid for moving fixpart
+ */
+ operPtrP->m_move_tuple_location.m_page_no = real_pid;
+ operPtrP->m_move_tuple_location.m_page_idx = move_page_idx;
+ } else {
+ /*
+ * if record isnot valid, we ignore it, and not to move fixpart
+ */
+ operPtrP->m_move_tuple_location.setNull();
+ }
+ } else {
+ /*
+ * if page isnot free, we ignore it, and not to move fixpart
+ */
+ operPtrP->m_move_tuple_location.setNull();
+ }
+ }
+ }
+ }
+
+ if (regTabPtr->m_bits & Tablerec::TR_Checksum) {
+ jam();
+ setChecksum(org, regTabPtr);
+ }
+ return 0;
+
+error:
+ tupkeyErrorLab(signal);
return -1;
}
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2008-04-03 20:30:13 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp 2008-04-24 17:05:17 +08:00
@@ -1557,13 +1557,15 @@ int Dbtup::updateAttributes(KeyReqStruct
{
jam();
Uint32 sz= ahIn.getDataSize();
- ndbrequire(sz == 1);
+ ndbrequire(sz == 2);
/**
* get optimize options
*/
- req_struct->optimize_options = * (inBuffer + inBufIndex + 1);
- req_struct->optimize_options &=
- AttributeHeader::OPTIMIZE_OPTIONS_MASK;
+ Uint64 options = 0;
+ memcpy(&options, inBuffer+inBufIndex+1, sz << 2);
+ req_struct->optimize_options = options &
AttributeHeader::OPTIMIZE_OPTIONS_MASK;
+ req_struct->optimize_row_id.m_page_no = options >> 32;
+ req_struct->optimize_row_id.m_page_idx = (options & 0xFFFF0000) >> 16;
inBufIndex += 1 + sz;
req_struct->in_buf_index = inBufIndex;
}
@@ -2222,6 +2224,10 @@ Dbtup::read_pseudo(const Uint32 * inBuff
}
case AttributeHeader::ROW_SIZE:
outBuffer[1] = tabptr.p->m_offsets[MM].m_fix_header_size << 2;
+ sz = 1;
+ break;
+ case AttributeHeader::FIXPAGE_SIZE:
+ outBuffer[1] = Fix_page::DATA_WORDS; /* in words */
sz = 1;
break;
case AttributeHeader::ROW_COUNT:
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2008-03-25 23:47:04 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2008-04-24 17:05:22 +08:00
@@ -119,6 +119,9 @@ Dbtup::execACC_SCANREQ(Signal* signal)
ndbrequire((bits & ScanOp::SCAN_DD) == 0);
ndbrequire((bits & ScanOp::SCAN_LOCK) == 0);
}
+
+ if (AccScanReq::getDescendingFlag(req->requestInfo))
+ bits |= ScanOp::SCAN_DESCEND;
// set up scan op
new (scanPtr.p) ScanOp();
@@ -568,9 +571,21 @@ Dbtup::scanFirst(Signal*, ScanOpPtr scan
scan.m_state = ScanOp::Last;
return;
}
+ key.m_page_idx = 0;
if (! (bits & ScanOp::SCAN_DD)) {
key.m_file_no = ZNIL;
- key.m_page_no = 0;
+ if ((bits & ScanOp::SCAN_DESCEND) && frag.noOfPages > 0) {
+ key.m_page_no = frag.m_max_page_no - 1;
+ TablerecPtr tablePtr;
+ tablePtr.i = scan.m_tableId;
+ ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+ /* Now we only support descending tuple scan on mem table */
+ Uint32 fixpart_size = (*tablePtr.p).m_offsets[0].m_fix_header_size;
+ const Uint32 page_size = Fix_page::DATA_WORDS;
+ /* set page_idx to maximal page index in a fixsize page */
+ key.m_page_idx = page_size - (page_size % fixpart_size) - fixpart_size;
+ } else
+ key.m_page_no = 0;
pos.m_get = ScanPos::Get_page_mm;
// for MM scan real page id is cached for efficiency
pos.m_realpid_mm = RNIL;
@@ -588,7 +603,6 @@ Dbtup::scanFirst(Signal*, ScanOpPtr scan
key.m_page_no = ext->m_first_page_no;
pos.m_get = ScanPos::Get_page_dd;
}
- key.m_page_idx = 0;
// let scanNext() do the work
scan.m_state = ScanOp::Next;
}
@@ -622,6 +636,9 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
Uint32 size = table.m_offsets[mm].m_fix_header_size;
+ const Uint32 page_size = Fix_page::DATA_WORDS;
+ const Uint16 max_page_idx = page_size - (page_size % size) - size;
+ const bool descending = bits & ScanOp::SCAN_DESCEND;
if (lcp && lcp_list != RNIL)
goto found_lcp_keep;
@@ -630,7 +647,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
case ScanPos::Get_next_tuple:
case ScanPos::Get_next_tuple_fs:
jam();
- key.m_page_idx += size;
+ key.m_page_idx = descending ? key.m_page_idx - size : key.m_page_idx + size;
// fall through
case ScanPos::Get_tuple:
case ScanPos::Get_tuple_fs:
@@ -671,8 +688,8 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
// move to next logical TUP page
jam();
{
- key.m_page_no++;
- if (key.m_page_no >= frag.m_max_page_no) {
+ if (descending ? --key.m_page_no == (Uint32)-1 :
+ ++key.m_page_no >= frag.m_max_page_no) {
jam();
if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL))
@@ -691,7 +708,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
return true;
}
cont:
- key.m_page_idx = 0;
+ key.m_page_idx = descending ? max_page_idx : 0;
pos.m_get = ScanPos::Get_page_mm;
// clear cached value
pos.m_realpid_mm = RNIL;
@@ -867,7 +884,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
// move to next fixed size tuple
jam();
{
- key.m_page_idx += size;
+ key.m_page_idx = descending ? key.m_page_idx - size : key.m_page_idx + size;
pos.m_get = ScanPos::Get_tuple_fs;
}
/*FALLTHRU*/
@@ -877,7 +894,8 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
jam();
{
Fix_page* page = (Fix_page*)pos.m_page;
- if (key.m_page_idx + size <= Fix_page::DATA_WORDS)
+ if (descending ? key.m_page_idx != (Uint16)(0 - size) :
+ key.m_page_idx + size <= Fix_page::DATA_WORDS)
{
pos.m_get = ScanPos::Get_next_tuple_fs;
#ifdef VM_TRACE
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2008-03-19 20:56:57 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2008-04-24 17:05:23 +08:00
@@ -543,6 +543,9 @@ void Dbtup::checkDetachedTriggers(KeyReq
regTablePtr->subscriptionUpdateTriggers,
regOperPtr, disk);
break;
+ case(ZMOVE):
+ jam();
+ break;
default:
ndbrequire(false);
break;
@@ -1136,6 +1139,22 @@ Dbtup::executeTuxUpdateTriggers(Signal*
}
int
+Dbtup::executeTuxMoveTriggers(Signal* signal,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr)
+{
+ TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
+ req->tableId = regFragPtr->fragTableId;
+ req->fragId = regFragPtr->fragmentId;
+ req->pageId = regOperPtr->m_move_tuple_location.m_page_no;
+ req->pageIndex = regOperPtr->m_move_tuple_location.m_page_idx;
+ req->tupVersion = regOperPtr->tupVersion;
+ req->opInfo = TuxMaintReq::OpAdd;
+ return addTuxEntries(signal, regOperPtr, regTabPtr);
+}
+
+int
Dbtup::addTuxEntries(Signal* signal,
Operationrec* regOperPtr,
Tablerec* regTabPtr)
@@ -1225,6 +1244,13 @@ Dbtup::executeTuxCommitTriggers(Signal*
return;
jam();
tupVersion= regOperPtr->tupVersion;
+ } else if (regOperPtr->op_struct.op_type == ZMOVE) {
+ tupVersion= regOperPtr->tupVersion;
+ /*
+ * if move varpart, we should not remove ordered
+ */
+ if (regOperPtr->m_move_tuple_location.isNull())
+ return;
} else {
ndbrequire(false);
tupVersion= 0; // remove warning
@@ -1255,6 +1281,9 @@ Dbtup::executeTuxAbortTriggers(Signal* s
jam();
tupVersion = regOperPtr->tupVersion;
} else if (regOperPtr->op_struct.op_type == ZDELETE) {
+ jam();
+ return;
+ } else if (regOperPtr->op_struct.op_type == ZMOVE) {
jam();
return;
} else {
diff -Nrup a/storage/ndb/src/ndbapi/NdbBlob.cpp b/storage/ndb/src/ndbapi/NdbBlob.cpp
--- a/storage/ndb/src/ndbapi/NdbBlob.cpp 2008-04-08 16:29:10 +08:00
+++ b/storage/ndb/src/ndbapi/NdbBlob.cpp 2008-04-24 17:05:24 +08:00
@@ -441,6 +441,7 @@ NdbBlob::isKeyOp()
return
theNdbOp->theOperationType == NdbOperation::InsertRequest ||
theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
+ theNdbOp->theOperationType == NdbOperation::MoveRequest ||
theNdbOp->theOperationType == NdbOperation::WriteRequest ||
theNdbOp->theOperationType == NdbOperation::ReadRequest ||
theNdbOp->theOperationType == NdbOperation::ReadExclusive ||
@@ -497,6 +498,7 @@ NdbBlob::isReadOnlyOp()
return ! (
theNdbOp->theOperationType == NdbOperation::InsertRequest ||
theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
+ theNdbOp->theOperationType == NdbOperation::MoveRequest ||
theNdbOp->theOperationType == NdbOperation::WriteRequest
);
}
diff -Nrup a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2008-04-08 23:08:06 +08:00
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2008-04-24 17:05:25 +08:00
@@ -405,8 +405,13 @@ NdbColumnImpl::create_pseudo(const char
col->m_impl.m_attrSize = 4;
col->m_impl.m_arraySize = 2;
} else if(!strcmp(name, "NDB$OPTIMIZE")){
- col->setType(NdbDictionary::Column::Unsigned);
+ col->setType(NdbDictionary::Column::Bigunsigned);
col->m_impl.m_attrId = AttributeHeader::OPTIMIZE;
+ col->m_impl.m_attrSize = 8;
+ col->m_impl.m_arraySize = 1;
+ } else if(!strcmp(name, "NDB$FIXPAGE_SIZE")){
+ col->setType(NdbDictionary::Column::Unsigned);
+ col->m_impl.m_attrId = AttributeHeader::FIXPAGE_SIZE;
col->m_impl.m_attrSize = 4;
col->m_impl.m_arraySize = 1;
} else {
@@ -1198,13 +1203,36 @@ NdbIndexImpl::getIndexTable() const
*/
NdbOptimizeTableHandleImpl::NdbOptimizeTableHandleImpl(NdbDictionary::OptimizeTableHandle
&f)
- : NdbDictionary::OptimizeTableHandle(* this),
- m_state(NdbOptimizeTableHandleImpl::CREATED),
- m_ndb(NULL), m_table(NULL),
+ : NdbDictionary::OptimizeTableHandle(* this), MAX_LIST_ELMT_NUM(32),
+ m_state(NdbOptimizeTableHandleImpl::CREATED), m_ndb(NULL), m_table(NULL),
m_table_queue(NULL), m_table_queue_first(NULL), m_table_queue_end(NULL),
- m_trans(NULL), m_scan_op(NULL),
+ SIZE_NULL((Uint32)-1), m_row_size(0), m_fixpage_size(0),
+ m_forward_trans(NULL), m_forward_scan_op(NULL),
+ m_backward_trans(NULL), m_backward_scan_op(NULL),
m_facade(this)
{
+ m_forward_scan_status.init();
+ m_backward_scan_status.init();
+
+ m_forward_extraGets[0].column = NdbDictionary::Column::FRAGMENT;
+ m_forward_extraGets[0].appStorage = 0;
+ m_forward_extraGets[0].recAttr = 0;
+ m_forward_extraGets[1].column = NdbDictionary::Column::ROWID;
+ m_forward_extraGets[1].appStorage = 0;
+ m_forward_extraGets[1].recAttr = 0;
+ m_forward_extraGets[2].column = NdbDictionary::Column::ROW_SIZE;
+ m_forward_extraGets[2].appStorage = 0;
+ m_forward_extraGets[2].recAttr = 0;
+ m_forward_extraGets[3].column = NdbDictionary::Column::FIXPAGE_SIZE;
+ m_forward_extraGets[3].appStorage = 0;
+ m_forward_extraGets[3].recAttr = 0;
+
+ m_backward_extraGets[0].column = NdbDictionary::Column::FRAGMENT;
+ m_backward_extraGets[0].appStorage = 0;
+ m_backward_extraGets[0].recAttr = 0;
+ m_backward_extraGets[1].column = NdbDictionary::Column::ROWID;
+ m_backward_extraGets[1].appStorage = 0;
+ m_backward_extraGets[1].recAttr = 0;
}
NdbOptimizeTableHandleImpl::~NdbOptimizeTableHandleImpl()
@@ -1214,65 +1242,231 @@ NdbOptimizeTableHandleImpl::~NdbOptimize
DBUG_VOID_RETURN;
}
+void NdbOptimizeTableHandleImpl::close_trans_op()
+{
+ if (m_forward_scan_op) {
+ m_forward_scan_op->close();
+ m_forward_scan_op = NULL;
+ }
+ if (m_forward_trans) {
+ m_ndb->closeTransaction(m_forward_trans);
+ m_forward_trans = NULL;
+ }
+ if (m_backward_scan_op) {
+ m_backward_scan_op->close();
+ m_backward_scan_op = NULL;
+ }
+ if (m_backward_trans) {
+ m_ndb->closeTransaction(m_backward_trans);
+ m_backward_trans = NULL;
+ }
+}
+
+int NdbOptimizeTableHandleImpl::init_page_row_size()
+{
+ DBUG_ENTER("NdbOptimizeTableImpl::init_page_row_size()");
+ int noRetries = 100;
+ if (m_table_queue) {
+ const NdbTableImpl * table = m_table_queue->table;
+
+ while (noRetries-- > 0) {
+ NdbSleep_MilliSleep(50);
+ close_trans_op();
+ /*
+ * Start forward scan to get size of fix page
+ * and size of one row in a fix page;
+ * for each table, we should get the size
+ */
+ if (!m_forward_trans)
+ m_forward_trans = m_ndb->startTransaction();
+ if (!m_forward_trans) {
+ if (m_ndb->getNdbError().status == NdbError::TemporaryError)
+ continue; /* next retry */
+ break; /* goto do_error */
+ }
+
+ NdbScanOperation::ScanOptions forward_scan_options;
+ forward_scan_options.optionsPresent =
+ NdbScanOperation::ScanOptions::SO_GETVALUE;
+
+ forward_scan_options.extraGetValues = &m_forward_extraGets[0];
+ forward_scan_options.numExtraGetValues = 4;
+
+ if ((m_forward_scan_op =
+ m_forward_trans->scanTable(table->m_ndbrecord,
+ NdbOperation::LM_CommittedRead,
+ (const unsigned char
*)NdbDictionaryImpl::m_emptyMask,
+ &forward_scan_options,
+ sizeof(NdbScanOperation::ScanOptions)))
+ == NULL) {
+ break; /* goto do_error */
+ }
+
+ /* need only get one tuple to get row size and fixpage size */
+ m_forward_recAttrRowsize = m_forward_extraGets[2].recAttr;
+ m_forward_recAttrFixpagesize = m_forward_extraGets[3].recAttr;
+
+ /* to commit to get row_size & fixpage_size */
+ if (m_forward_trans->execute(NdbTransaction::NoCommit) != 0) {
+ if (m_forward_trans->getNdbError().status == NdbError::TemporaryError)
+ continue; /* next retry */
+ m_ndb->getNdbError(m_forward_trans->getNdbError().code);
+ break; /* goto do_error */
+ }
+
+ const char* dummyOutRowPtr;
+ int check = 0;
+ if ((check = m_forward_scan_op->nextResult(&dummyOutRowPtr,
+ true,
+ false)) == 0) {
+ m_row_size = m_forward_recAttrRowsize->u_32_value() >> 2; /* into words
*/
+ m_fixpage_size = m_forward_recAttrFixpagesize->u_32_value();
+ } else if (check == -1) {
+ if (m_forward_trans->getNdbError().status == NdbError::TemporaryError) {
+ if (noRetries > 0)
+ continue; /* next retry */
+ }
+ m_ndb->getNdbError(m_forward_trans->getNdbError().code);
+ break; /* goto do_error */
+ } else {
+ /* the table is empty, no rows */
+ m_row_size = m_fixpage_size = SIZE_NULL;
+ }
+
+ close_trans_op();
+ DBUG_RETURN(0); /* ok! initialized size of page and row */
+ }
+ }
+/*do_error: */
+ close_trans_op();
+ DBUG_PRINT("info", ("NdbOptimizeTableImpl::init_page_row_size() failed"));
+ DBUG_RETURN(-1);
+}
+
int NdbOptimizeTableHandleImpl::start()
{
int noRetries = 100;
- DBUG_ENTER("NdbOptimizeTableImpl::start");
+ DBUG_ENTER("NdbOptimizeTableImpl::start()");
- if (m_table_queue)
- {
+ if (m_table_queue) {
const NdbTableImpl * table = m_table_queue->table;
+ if (m_row_size == 0 && m_fixpage_size == 0) {
+ if (init_page_row_size())
+ goto do_error;
+ else if (m_row_size == SIZE_NULL && m_fixpage_size == SIZE_NULL) {
+ /* empty table after init_page_row_size() */
+ m_state = NdbOptimizeTableHandleImpl::INITIALIZED;
+ DBUG_RETURN(0);
+ }
+ }
+
+ m_forward_scan_status.free_list();
+ m_forward_scan_status.init();
+ m_backward_scan_status.init();
/*
- * Start/Restart transaction
+ * Start/Restart forward and backward transactions
*/
- while (noRetries-- > 0)
- {
- if (m_trans && (m_trans->restart() != 0))
- {
- m_ndb->closeTransaction(m_trans);
- m_trans = NULL;
+ while (noRetries-- > 0) {
+ NdbSleep_MilliSleep(50);
+ close_trans_op();
+ if (!m_forward_trans)
+ m_forward_trans = m_ndb->startTransaction();
+ if (!m_forward_trans) {
+ if (m_ndb->getNdbError().status == NdbError::TemporaryError) {
+ if (noRetries > 0)
+ continue; /* next retry */
+ }
+ goto do_error;
}
- else
- m_trans = m_ndb->startTransaction();
- if (!m_trans)
- {
- if (noRetries == 0)
- goto do_error;
- continue;
+
+ if (!m_backward_trans)
+ m_backward_trans = m_ndb->startTransaction();
+ if (!m_backward_trans) {
+ if (m_ndb->getNdbError().status == NdbError::TemporaryError) {
+ if (noRetries > 0)
+ continue; /* next retry */
+ }
+ goto do_error;
}
-
+
/*
* Get first scan operation
- */
- if ((m_scan_op = m_trans->getNdbScanOperation(table->m_facade))
- == NULL)
- {
- m_ndb->getNdbError(m_trans->getNdbError().code);
+ * LM_Exclusive means that key information will be available
+ * for subsequent lock takeover operations.
+ */
+ NdbScanOperation::ScanOptions forward_scan_options, backward_scan_options;
+ forward_scan_options.optionsPresent = backward_scan_options.optionsPresent =
+ NdbScanOperation::ScanOptions::SO_SCANFLAGS |
+ NdbScanOperation::ScanOptions::SO_PARALLEL |
+ NdbScanOperation::ScanOptions::SO_GETVALUE;
+ forward_scan_options.extraGetValues = &m_forward_extraGets[0];
+ forward_scan_options.numExtraGetValues =
+ backward_scan_options.numExtraGetValues = 2;
+ forward_scan_options.scan_flags = NdbScanOperation::SF_TupScan;
+ backward_scan_options.extraGetValues = &m_backward_extraGets[0];
+ backward_scan_options.scan_flags = NdbScanOperation::SF_TupScan |
+ NdbScanOperation::SF_Descending;
+ /*
+ * scan tuples in fragment one by one,
+ * then we can easy to make sure move tuple in one same frament
+ */
+ forward_scan_options.parallel = backward_scan_options.parallel = 1;
+
+
+ if ((m_forward_scan_op =
+ m_forward_trans->scanTable(table->m_ndbrecord,
+ NdbOperation::LM_CommittedRead, /* no lock */
+ (const unsigned char
*)NdbDictionaryImpl::m_emptyMask,
+ &forward_scan_options,
+ sizeof(NdbScanOperation::ScanOptions)))
+ == NULL) {
+ m_ndb->getNdbError(m_forward_trans->getNdbError().code);
goto do_error;
}
-
- /**
- * Define a result set for the scan.
- */
- if (m_scan_op->readTuples(NdbOperation::LM_Exclusive)) {
- m_ndb->getNdbError(m_trans->getNdbError().code);
+
+ m_forward_recAttrFragno = m_forward_extraGets[0].recAttr;
+ m_forward_recAttrRowid = m_forward_extraGets[1].recAttr;
+
+ if ((m_backward_scan_op =
+ m_backward_trans->scanTable(table->m_ndbrecord,
+ NdbOperation::LM_Exclusive, /* exclusive lock */
+ (const unsigned char
*)NdbDictionaryImpl::m_emptyMask,
+ &backward_scan_options,
+ sizeof(NdbScanOperation::ScanOptions)))
+ == NULL) {
+ m_ndb->getNdbError(m_backward_trans->getNdbError().code);
goto do_error;
}
-
+
+ m_backward_recAttrFragno = m_backward_extraGets[0].recAttr;
+ m_backward_recAttrRowid = m_backward_extraGets[1].recAttr;
+
/**
* Start scan (NoCommit since we are only reading at this stage);
*/
- if (m_trans->execute(NdbTransaction::NoCommit) != 0) {
- if (m_trans->getNdbError().status == NdbError::TemporaryError)
- continue; /* goto next_retry */
- m_ndb->getNdbError(m_trans->getNdbError().code);
+ if (m_forward_trans->execute(NdbTransaction::NoCommit) != 0) {
+ if (m_forward_trans->getNdbError().status == NdbError::TemporaryError) {
+ if (noRetries > 0)
+ continue; /* goto next_retry */
+ }
+ m_ndb->getNdbError(m_forward_trans->getNdbError().code);
goto do_error;
}
- break;
- } // while (noRetries-- > 0)
+
+ if (m_backward_trans->execute(NdbTransaction::NoCommit) != 0) {
+ if (m_backward_trans->getNdbError().status == NdbError::TemporaryError) {
+ if (noRetries > 0)
+ continue; /* goto next_retry */
+ }
+ m_ndb->getNdbError(m_backward_trans->getNdbError().code);
+ goto do_error;
+ }
+
+ break; /* okay, initialization is done */
+ } /* while (noRetries-- > 0) */
m_state = NdbOptimizeTableHandleImpl::INITIALIZED;
- } // if (m_table_queue)
+ } /* if (m_table_queue) */
else
m_state = NdbOptimizeTableHandleImpl::FINISHED;
@@ -1288,27 +1482,24 @@ int NdbOptimizeTableHandleImpl::init(Ndb
DBUG_ENTER("NdbOptimizeTableHandleImpl::init");
NdbDictionary::Dictionary* dict = ndb->getDictionary();
Uint32 sz = table.m_columns.size();
- bool found_varpart = false;
+ bool found_disk = false;
int blob_num = table.m_noOfBlobs;
m_ndb = ndb;
m_table = &table;
/**
- * search whether there are var size columns in the table,
- * in first step, we only optimize var part, then if the
- * table has no var size columns, we do not do optimizing
+ * search whether the table is memory-based,
+ * currently, we only optimize memory table
*/
for (Uint32 i = 0; i < sz; i++) {
const NdbColumnImpl *col = m_table->m_columns[i];
- if (col != 0 && col->m_storageType == NDB_STORAGETYPE_MEMORY &&
- (col->m_dynamic || col->m_arrayType != NDB_ARRAYTYPE_FIXED)) {
- found_varpart= true;
+ if (col != 0 && col->m_storageType == NDB_STORAGETYPE_DISK) {
+ found_disk= true;
break;
}
}
- if (!found_varpart)
- {
+ if (found_disk) {
m_state = NdbOptimizeTableHandleImpl::FINISHED;
DBUG_RETURN(0);
}
@@ -1332,8 +1523,7 @@ int NdbOptimizeTableHandleImpl::init(Ndb
blob_num--;
const NdbTableImpl * blob_table =
(const NdbTableImpl *)dict->getBlobTable(m_table, c.m_attrId);
- if (blob_table)
- {
+ if (blob_table) {
m_table_queue_end = new fifo_element_st(blob_table, m_table_queue_end);
}
}
@@ -1345,100 +1535,285 @@ int NdbOptimizeTableHandleImpl::init(Ndb
int NdbOptimizeTableHandleImpl::next()
{
- int noRetries = 100;
- int done, check;
DBUG_ENTER("NdbOptimizeTableHandleImpl::next");
+ int noRetries = 100;
+ const char* dummyOutRowPtr;
if (m_state == NdbOptimizeTableHandleImpl::FINISHED)
- DBUG_RETURN(0);
+ DBUG_RETURN(0); /* all tables are scanned */
else if (m_state != NdbOptimizeTableHandleImpl::INITIALIZED)
DBUG_RETURN(-1);
- while (noRetries-- > 0)
- {
- if ((done = check = m_scan_op->nextResult(true)) == 0)
- {
- do
- {
- /**
- * Get update operation
- */
- NdbOperation * myUpdateOp = m_scan_op->updateCurrentTuple();
- if (myUpdateOp == 0)
- {
- m_ndb->getNdbError(m_trans->getNdbError().code);
+ assert(m_row_size != 0 && m_fixpage_size != 0);
+ /* empty table, then scan next table */
+ if (m_row_size == SIZE_NULL && m_fixpage_size == SIZE_NULL) {
+ fifo_element_st *current = m_table_queue;
+ m_table_queue = current->next;
+ /* Start scan of next table */
+ m_row_size = m_fixpage_size = 0;
+ if (start() != 0)
+ goto do_error;
+ DBUG_RETURN(1);
+ }
+
+ while (noRetries-- > 0) {
+
+ /*
+ * do forward scan(holes producer):
+ * if we can accomodate more holes in list, then
+ * scan table to find holes and store into list
+ */
+ if (m_forward_scan_status.list_elements_num < MAX_LIST_ELMT_NUM &&
+ m_forward_scan_status.finished == 0) {
+ int f_check1 = 0, f_check2 = 0;
+ if ((f_check1 = m_forward_scan_op->nextResult(&dummyOutRowPtr,
+ true,
+ false)) == 0) {
+ do {
+ Uint32 frag_no = m_forward_recAttrFragno->u_32_value();
+ Uint32 page_no = m_forward_recAttrRowid->u_32_value();
+ //TODO, different endian issue ?
+ Uint32 page_idx = *(Uint32*)(m_forward_recAttrRowid->aRef() + 4);
+ row_spec current_row(frag_no, page_no, page_idx);
+ row_spec prev_row = m_forward_scan_status.latest_row;
+ /*
+ * ROWID of current row should be larger than ROWID of previous row,
+ * since it is in a forward scan now.
+ */
+ if (! prev_row.is_null()) {
+ if (prev_row.frag_no == current_row.frag_no)
+ assert(current_row > prev_row);
+ else {
+ /* we already set parallel = 1, then fragment no. start from 0 to 1, 2, ...
*/
+ assert(current_row.frag_no > prev_row.frag_no);
+ }
+ }
+
+ m_forward_scan_status.latest_row = current_row;
+
+ /* find holes, and store holes rowid into lists */
+ if (! current_row.is_neighbor(prev_row, m_row_size, m_fixpage_size)) {
+ /* maximal page index in a fixsize page */
+ Uint32 max_idx =
+ m_fixpage_size - (m_fixpage_size % m_row_size) - m_row_size;
+ if (prev_row.is_null() ||
+ (!prev_row.is_null() && current_row.frag_no != prev_row.frag_no))
{
+ assert(current_row.page_idx > 0);
+ row_spec hole(current_row.frag_no,
+ current_row.page_no,
+ 0);
+ Uint32 holes_num = current_row.page_idx / m_row_size;
+ m_forward_scan_status.add_holes(hole, holes_num);
+ } else { /* in same frag */
+ if (current_row.page_no != prev_row.page_no) {
+ if (prev_row.page_idx < max_idx) {
+ row_spec hole(prev_row.frag_no,
+ prev_row.page_no,
+ prev_row.page_idx + m_row_size);
+ Uint32 holes_num = (max_idx - prev_row.page_idx) / m_row_size;
+ m_forward_scan_status.add_holes(hole, holes_num);
+ }
+ if (current_row.page_idx > 0) {
+ row_spec hole(current_row.frag_no,
+ current_row.page_no,
+ 0);
+ Uint32 holes_num = current_row.page_idx / m_row_size;
+ m_forward_scan_status.add_holes(hole, holes_num);
+ }
+ } else { /* in same page */
+ assert(current_row.page_idx - prev_row.page_idx > m_row_size);
+ row_spec hole(prev_row.frag_no,
+ prev_row.page_no,
+ prev_row.page_idx + m_row_size);
+ Uint32 holes_num =
+ (current_row.page_idx - prev_row.page_idx)/m_row_size - 1;
+ m_forward_scan_status.add_holes(hole, holes_num);
+ }
+ }
+ /* list is full, then suspend forward scan */
+ if (m_forward_scan_status.list_elements_num >= MAX_LIST_ELMT_NUM)
+ break;
+ } /* end if (! current_row.is_neighbor(...)) */
+
+ } while ((f_check2 = m_forward_scan_op->nextResult(&dummyOutRowPtr,
+ false,
+ false)) == 0);
+ } /* end of if nextResult(true, false) */
+
+ if (f_check1 == -1 || f_check2 == -1) {
+ if (m_forward_trans->getNdbError().status == NdbError::TemporaryError
&&
+ noRetries > 0) {
+ if (start() != 0)
+ goto do_error;
+ continue; /* next retry */
+ } else {
+ m_ndb->getNdbError(m_forward_trans->getNdbError().code);
goto do_error;
}
- /**
- * optimize a tuple through doing the update
- * first step, move varpart
- */
- Uint32 options = 0 | AttributeHeader::OPTIMIZE_MOVE_VARPART;
- myUpdateOp->setOptimize(options);
- /**
- * nextResult(false) means that the records
- * cached in the NDBAPI are modified before
- * fetching more rows from NDB.
- */
- } while ((check = m_scan_op->nextResult(false)) == 0);
- }
+ }
+
+ if (f_check1 == 1)
+ m_forward_scan_status.finished = 1; /* forward scan is end */
+
+ } /* end if forward scan */
+
+ int need_backward_scan = 0;
+ int have_holes = (m_forward_scan_status.list_elements_num > 0 ? 1 : 0);
/**
- * Commit when all cached tuple have been updated
+ * when forward scan isnot finished and no holes,
+ * we should not do backward scan, that means, we
+ * need wait forward scan to produce holes.
*/
- if (check != -1)
- check = m_trans->execute(NdbTransaction::Commit);
-
- if (done == 1)
- {
+ if (m_forward_scan_status.finished == 1 || have_holes)
+ need_backward_scan = 1;
+
+ /*
+ * do backward scan:
+ * --holes consumer, i.e. moving fixpart
+ * --moving varpart
+ */
+ if (need_backward_scan) {
+ int b_check1 = 0, b_check2 = 0;
+ if ((b_check1 = m_backward_scan_op->nextResult(&dummyOutRowPtr,
+ true,
+ false)) == 0) {
+ do {
+ /* get current tuple's frag_no and ROWID */
+ Uint32 frag_no = m_backward_recAttrFragno->u_32_value();
+ Uint32 page_no = m_backward_recAttrRowid->u_32_value();
+ //TODO, different endian issue ?
+ Uint32 page_idx = *(Uint32*)(m_backward_recAttrRowid->aRef() + 4);
+ row_spec current_row(frag_no, page_no, page_idx);
+ row_spec prev_row = m_backward_scan_status.latest_row;
+
+ /*
+ * ROWID of current row should be less than ROWID of previous row,
+ * since it is in a backward scan now.
+ */
+ if (! prev_row.is_null()) {
+ if (prev_row.frag_no == current_row.frag_no)
+ assert(prev_row > current_row);
+ else {
+ /* we already set parallel = 1, then fragment no. start from 0 to 1, 2, ...
*/
+ assert(current_row.frag_no > prev_row.frag_no);
+ }
+ }
+
+ m_backward_scan_status.latest_row = current_row;
+ row_spec valid_hole;
+
+ Uint32 optimize_option = 1; /* move varpart */
+
+ if (have_holes) {
+ /* get one hole from hosts list in forward scan, then consume it*/
+ int need_move_fixpart =
+ m_forward_scan_status.get_valid_hole(current_row, valid_hole, m_row_size);
+ /* we got one valid hole */
+ if (need_move_fixpart == 1) {
+ assert(!valid_hole.is_null());
+ optimize_option |= 2; /* move fixpart */
+ }
+ }
+
+ const NdbTableImpl * table = m_table_queue->table;
+ /**
+ * Get update operation which updates nothing,
+ * but has optimize set in OperationOptions
+ */
+ const NdbOperation * myUpdateOp =
+ m_backward_scan_op->moveCurrentTuple(m_backward_trans,
+ table->m_ndbrecord,
+ optimize_option,
+ valid_hole.page_no,
+ valid_hole.page_idx);
+ if (myUpdateOp == 0) {
+ m_ndb->getNdbError(m_backward_trans->getNdbError().code);
+ goto do_error;
+ }
+
+ /*
+ * if list is empty and forward scan isnot finished,
+ * that means we need to wait more holes,then we suspend
+ * backward scan
+ */
+ if (m_forward_scan_status.list_elements_num == 0 &&
+ m_forward_scan_status.finished == 0)
+ break;
+
+ /**
+ * nextResult(false) means that the records
+ * cached in the NDBAPI are modified before
+ * fetching more rows from NDB.
+ */
+ } while ((b_check2 = m_backward_scan_op->nextResult(&dummyOutRowPtr,
+ false,
+ false)) == 0);
+ if (b_check2 != -1)
+ /*
+ * we commit here in order to release lock ASAP
+ */
+ b_check2 = m_backward_trans->execute(NdbTransaction::Commit,
+ NdbOperation::AbortOnError);
+ if (b_check2 != -1)
+ b_check2 = m_backward_trans->restart();
+ } /* end if nextResult(true, false) */
+
+ if (b_check1 == -1 || b_check2 == -1) {
+ if (m_backward_trans->getNdbError().status == NdbError::TemporaryError
&&
+ noRetries > 0) {
+ if (start() != 0)
+ goto do_error;
+ continue; /* next retry */
+ } else {
+ m_ndb->getNdbError(m_backward_trans->getNdbError().code);
+ goto do_error;
+ }
+ }
+
+ if (b_check1 == 1)
+ m_backward_scan_status.finished = 1;
+ } /* end if backwarad scan */
+
+ if (m_backward_scan_status.finished == 1) {
+ /**
+ * Commit when all cached tuple have been updated
+ */
+ int check = m_backward_trans->execute(NdbTransaction::Commit,
+ NdbOperation::AbortOnError);
+ if (check == -1) {
+ if (m_backward_trans->getNdbError().status == NdbError::TemporaryError
&&
+ noRetries > 0) {
+ if (start() != 0)
+ goto do_error;
+ continue; /* next retry */
+ } else {
+ m_ndb->getNdbError(m_backward_trans->getNdbError().code);
+ goto do_error;
+ }
+ }
DBUG_PRINT("info", ("Done with table %s",
m_table_queue->table->getName()));
/*
- * We are done with optimizing current table
- * move to next
+ * It's done for optimizing current table,
+ * close scan operation and move to next table
*/
fifo_element_st *current = m_table_queue;
m_table_queue = current->next;
+
+ m_row_size = m_fixpage_size = 0;
/*
* Start scan of next table
*/
if (start() != 0) {
- m_ndb->getNdbError(m_trans->getNdbError().code);
goto do_error;
}
DBUG_RETURN(1);
}
- if (check == -1)
- {
- if (m_trans->getNdbError().status == NdbError::TemporaryError)
- {
- /*
- * If we encountered temporary error, retry
- */
- m_ndb->closeTransaction(m_trans);
- m_trans = NULL;
- if (start() != 0) {
- m_ndb->getNdbError(m_trans->getNdbError().code);
- goto do_error;
- }
- continue; //retry
- }
- m_ndb->getNdbError(m_trans->getNdbError().code);
- goto do_error;
- }
- if (m_trans->restart() != 0)
- {
- DBUG_PRINT("info", ("Failed to restart transaction"));
- m_ndb->closeTransaction(m_trans);
- m_trans = NULL;
- if (start() != 0) {
- m_ndb->getNdbError(m_trans->getNdbError().code);
- goto do_error;
- }
- }
-
- DBUG_RETURN(1);
- }
+
+ DBUG_RETURN(1); //okay, one next() run okay!
+ } /* end of while (noRetries-- > 0)*/
+
do_error:
DBUG_PRINT("info", ("NdbOptimizeTableHandleImpl::next aborted"));
m_state = NdbOptimizeTableHandleImpl::ABORTED;
@@ -1457,12 +1832,12 @@ int NdbOptimizeTableHandleImpl::close()
delete m_table_queue_first;
m_table_queue_first = next;
}
- m_table_queue = m_table_queue_first = m_table_queue_end = NULL;
- if (m_trans)
- {
- m_ndb->closeTransaction(m_trans);
- m_trans = NULL;
- }
+ /*
+ * release possible holes list in forward scan
+ */
+ m_forward_scan_status.free_list();
+
+ close_trans_op();
m_state = NdbOptimizeTableHandleImpl::CLOSED;
DBUG_RETURN(0);
}
@@ -6889,3 +7264,4 @@ const NdbDictionary::Column * NdbDiction
const NdbDictionary::Column * NdbDictionary::Column::ANY_VALUE = 0;
const NdbDictionary::Column * NdbDictionary::Column::COPY_ROWID = 0;
const NdbDictionary::Column * NdbDictionary::Column::OPTIMIZE = 0;
+const NdbDictionary::Column * NdbDictionary::Column::FIXPAGE_SIZE = 0;
diff -Nrup a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp 2008-04-02 00:24:32 +08:00
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp 2008-04-24 17:05:25 +08:00
@@ -322,7 +322,203 @@ class NdbOptimizeTableHandleImpl : publi
{
enum State { CREATED, INITIALIZED, FINISHED, ABORTED, CLOSED };
private:
+ typedef struct row_spec {
+ Uint32 frag_no;
+ Uint32 page_no;
+ Uint32 page_idx;
+ row_spec() : frag_no((Uint32)-1), page_no((Uint32)-1), page_idx((Uint32)-1) {};
+ row_spec(const Uint32 a, const Uint32 b, const Uint32 c) {
+ frag_no = a;
+ page_no = b;
+ page_idx = c;
+ }
+ void set_null() {
+ frag_no = (Uint32)-1;
+ page_no = (Uint32)-1;
+ page_idx = (Uint32)-1;
+ }
+ bool is_null() {
+ if (frag_no == (Uint32)-1 ||
+ page_no == (Uint32)-1 ||
+ page_idx == (Uint32)-1)
+ return true;
+ else
+ return false;
+ }
+ bool operator > (row_spec & row) {
+ /* we only compare two valid rows in a same fragment */
+ assert((! row.is_null()) && (! is_null()) && frag_no ==
row.frag_no);
+ if (page_no > row.page_no ||
+ (page_no == row.page_no && page_idx > row.page_idx)) {
+ return true;
+ }
+ return false;
+ }
+ /*
+ * is_neighbor() should be invoked only in forward scan
+ */
+ bool is_neighbor(row_spec prev_row,
+ const Uint32 row_size,
+ const Uint32 page_size) {
+ /*
+ * in forward scan:
+ * assert frag_no start from 0 to 1, 2, ...
+ * assert frag_no >= prev_row.frag_no
+ * assert page_no >= prev_row.page_no
+ * assert page_idx > prev_row.page_idx
+ */
+ if (prev_row.is_null()) {
+ if (frag_no == 0 && page_idx == 0)
+ return true;
+ } else if (frag_no == prev_row.frag_no) {
+ if (page_no == prev_row.page_no) {
+ if (page_idx == (prev_row.page_idx + row_size))
+ return true;
+ } else {
+ Uint32 max_idx = page_size - (page_size % row_size) - row_size;
+ if ( prev_row.page_idx == max_idx && page_idx == 0)
+ return true;
+ }
+ } else if (page_idx == 0) {
+ return true;
+ }
+ return false;
+ }
+ }; /* end of struct row_spec */
+
+ typedef struct backward_scan_status {
+ row_spec latest_row;
+ int finished;
+ backward_scan_status():latest_row(),finished(0) {};
+ void init() {
+ latest_row.set_null();
+ finished = 0;
+ }
+ };
+
+ typedef struct fifo_holes_list {
+ fifo_holes_list(const row_spec hole, const Uint32 num) {
+ start_hole = hole;
+ holes_num = num;
+ next = NULL;
+ }
+ row_spec start_hole;
+ Uint32 holes_num;
+ fifo_holes_list * next;
+ };
+
+ const Uint32 MAX_LIST_ELMT_NUM;
+ typedef struct forward_scan_status: public backward_scan_status {
+ private:
+ /* overload default constructors to avoid object copy */
+ forward_scan_status(const forward_scan_status&);
+ forward_scan_status& operator = (const forward_scan_status&);
+ public:
+ /*
+ * Maximal number of elements in holes list
+ * should be less than const MAX_LIST_ELMT_NUM.
+ */
+ Uint32 list_elements_num;
+ fifo_holes_list * holes_list_first;
+ fifo_holes_list * holes_list_end;
+ forward_scan_status():
+ backward_scan_status(),
+ list_elements_num(0),
+ holes_list_first(NULL),
+ holes_list_end(NULL) {};
+ ~forward_scan_status() {
+ free_list();
+ }
+ void init() {
+ backward_scan_status::init();
+ list_elements_num = 0;
+ holes_list_first = holes_list_end = NULL;
+ }
+ void free_list() {
+ fifo_holes_list * now = holes_list_first;
+ Uint32 count = 0;
+ while(now) {
+ count++;
+ now = now->next;
+ }
+ assert(count == list_elements_num);
<