#At file:///home/kgeorge/mysql/bzr/merge-6.0-bugteam/
2855 Georgi Kodinov 2008-10-07 [merge]
merged 6.0-main -> 6.0-bugteam
removed:
libmysql/dll.c
added:
sql/replication.h
sql/rpl_handler.cc
sql/rpl_handler.h
renamed:
mysql-test/suite/falcon_team/r/falcon_bug_23945.result => mysql-test/suite/falcon/r/falcon_bug_23945.result
mysql-test/suite/falcon_team/r/falcon_bug_34892.result => mysql-test/suite/falcon/r/falcon_bug_34892.result
mysql-test/suite/falcon_team/t/falcon_bug_23945.test => mysql-test/suite/falcon/t/falcon_bug_23945.test
mysql-test/suite/falcon_team/t/falcon_bug_34892.test => mysql-test/suite/falcon/t/falcon_bug_34892.test
modified:
include/mysql/plugin.h
include/mysql/plugin.h.pp
libmysql/CMakeLists.txt
libmysql/Makefile.am
libmysqld/CMakeLists.txt
libmysqld/Makefile.am
mysql-test/suite/falcon/r/falcon_bug_34890.result
mysql-test/suite/falcon/r/falcon_online_index.result
mysql-test/suite/falcon/t/falcon_bug_34890.test
mysql-test/suite/falcon/t/falcon_online_index.test
mysql-test/suite/falcon_team/t/disabled.def
mysql-test/suite/falcon_team/t/test2bug.def
sql/CMakeLists.txt
sql/Makefile.am
sql/log.cc
sql/log.h
sql/mysql_priv.h
sql/mysqld.cc
sql/slave.cc
sql/sql_insert.cc
sql/sql_parse.cc
sql/sql_plugin.cc
sql/sql_plugin.h
sql/sql_repl.cc
sql/transaction.cc
storage/falcon/Cache.cpp
storage/falcon/Cache.h
storage/falcon/Database.cpp
storage/falcon/DeferredIndex.cpp
storage/falcon/SRLUpdateIndex.cpp
storage/falcon/SerialLogWindow.cpp
storage/falcon/StorageHandler.cpp
storage/falcon/StorageVersion.h
storage/falcon/Table.cpp
storage/falcon/Transaction.cpp
storage/falcon/Transaction.h
storage/falcon/ha_falcon.cpp
mysql-test/suite/falcon/r/falcon_bug_23945.result
mysql-test/suite/falcon/t/falcon_bug_23945.test
mysql-test/suite/falcon/t/falcon_bug_34892.test
=== modified file 'include/mysql/plugin.h'
--- a/include/mysql/plugin.h 2008-06-28 11:00:59 +0000
+++ b/include/mysql/plugin.h 2008-09-23 14:33:18 +0000
@@ -16,6 +16,11 @@
#ifndef _my_plugin_h
#define _my_plugin_h
+/* size_t */
+#include <stdlib.h>
+
+typedef struct st_mysql MYSQL;
+
#ifdef __cplusplus
class THD;
class Item;
@@ -66,7 +71,8 @@ typedef struct st_mysql_xid MYSQL_XID;
#define MYSQL_DAEMON_PLUGIN 3 /* The daemon/raw plugin type */
#define MYSQL_INFORMATION_SCHEMA_PLUGIN 4 /* The I_S plugin type */
#define MYSQL_AUDIT_PLUGIN 5 /* The Audit plugin type */
-#define MYSQL_MAX_PLUGIN_TYPE_NUM 6 /* The number of plugin types */
+#define MYSQL_REPLICATION_PLUGIN 6 /* The replication plugin type */
+#define MYSQL_MAX_PLUGIN_TYPE_NUM 7 /* The number of plugin types */
/* We use the following strings to define licenses for plugins */
#define PLUGIN_LICENSE_PROPRIETARY 0
@@ -461,6 +467,19 @@ struct handlerton;
/*************************************************************************
+ API for Replication plugin. (MYSQL_REPLICATION_PLUGIN)
+*/
+#define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100
+
+/**
+ Replication plugin descriptor
+*/
+struct Mysql_replication {
+ int interface_version;
+};
+
+
+/*************************************************************************
st_mysql_value struct for reading values from mysqld.
Used by server variables framework to parse user-provided values.
Will be used for arguments when implementing UDFs.
@@ -613,6 +632,64 @@ void mysql_query_cache_invalidate4(MYSQL
const char *key, unsigned int key_length,
int using_trx);
+/**
+ Get the value of user variable as an integer.
+
+ This function will return the value of variable @a name as an
+ integer. If the original value of the variable is not an integer,
+ the value will be converted into an integer.
+
+ @param name user variable name
+ @param value pointer to return the value
+ @param null_value if not NULL, the function will set it to true if
+ the value of variable is null, set to false if not
+
+ @retval 0 Success
+ @retval 1 Variable not found
+*/
+int get_user_var_int(const char *name,
+ long long int *value, int *null_value);
+
+/**
+ Get the value of user variable as a double precision float number.
+
+ This function will return the value of variable @a name as real
+ number. If the original value of the variable is not a real number,
+ the value will be converted into a real number.
+
+ @param name user variable name
+ @param value pointer to return the value
+ @param null_value if not NULL, the function will set it to true if
+ the value of variable is null, set to false if not
+
+ @retval 0 Success
+ @retval 1 Variable not found
+*/
+int get_user_var_real(const char *name,
+ double *value, int *null_value);
+
+/**
+ Get the value of user variable as a string.
+
+ This function will return the value of variable @a name as
+ string. If the original value of the variable is not a string,
+ the value will be converted into a string.
+
+ @param name user variable name
+ @param value pointer to the value buffer
+ @param len length of the value buffer
+ @param precision precision of the value if it is a float number
+ @param null_value if not NULL, the function will set it to true if
+ the value of variable is null, set to false if not
+
+ @retval 0 Success
+ @retval 1 Variable not found
+*/
+int get_user_var_str(const char *name,
+ char *value, unsigned long len,
+ unsigned int precision, int *null_value);
+
+
#ifdef __cplusplus
}
#endif
=== modified file 'include/mysql/plugin.h.pp'
--- a/include/mysql/plugin.h.pp 2008-09-12 08:58:52 +0000
+++ b/include/mysql/plugin.h.pp 2008-10-07 17:04:28 +0000
@@ -1,3 +1,5 @@
+#include <stdlib.h>
+typedef struct st_mysql MYSQL;
struct st_mysql_lex_string
{
char *str;
@@ -106,6 +108,9 @@ struct st_mysql_storage_engine
int interface_version;
};
struct handlerton;
+struct Mysql_replication {
+ int interface_version;
+};
struct st_mysql_value
{
int (*value_type)(struct st_mysql_value *);
@@ -139,3 +144,10 @@ void thd_get_xid(const void* thd, MYSQL_
void mysql_query_cache_invalidate4(void* thd,
const char *key, unsigned int key_length,
int using_trx);
+int get_user_var_int(const char *name,
+ long long int *value, int *null_value);
+int get_user_var_real(const char *name,
+ double *value, int *null_value);
+int get_user_var_str(const char *name,
+ char *value, unsigned long len,
+ unsigned int precision, int *null_value);
=== modified file 'libmysql/CMakeLists.txt'
--- a/libmysql/CMakeLists.txt 2008-09-09 19:02:38 +0000
+++ b/libmysql/CMakeLists.txt 2008-10-07 17:04:28 +0000
@@ -119,7 +119,7 @@ ADD_LIBRARY(mysqlclient_notls STATIC ${C
ADD_DEPENDENCIES(mysqlclient_notls GenError)
TARGET_LINK_LIBRARIES(mysqlclient_notls)
-ADD_LIBRARY(libmysql SHARED ${CLIENT_SOURCES} dll.c libmysql.def)
+ADD_LIBRARY(libmysql SHARED ${CLIENT_SOURCES} libmysql.def)
IF(WIN32)
SET_TARGET_PROPERTIES(libmysql mysqlclient PROPERTIES COMPILE_FLAGS "-DUSE_TLS")
ENDIF(WIN32)
=== modified file 'libmysql/Makefile.am'
--- a/libmysql/Makefile.am 2007-11-22 11:39:07 +0000
+++ b/libmysql/Makefile.am 2008-09-29 17:47:27 +0000
@@ -31,7 +31,7 @@ include $(srcdir)/Makefile.shared
libmysqlclient_la_SOURCES = $(target_sources)
libmysqlclient_la_LIBADD = $(target_libadd) $(yassl_las)
libmysqlclient_la_LDFLAGS = $(target_ldflags)
-EXTRA_DIST = Makefile.shared libmysql.def dll.c CMakeLists.txt
+EXTRA_DIST = Makefile.shared libmysql.def CMakeLists.txt
noinst_HEADERS = client_settings.h
link_sources:
=== removed file 'libmysql/dll.c'
--- a/libmysql/dll.c 2008-09-01 22:30:06 +0000
+++ b/libmysql/dll.c 1970-01-01 00:00:00 +0000
@@ -1,125 +0,0 @@
-/* Copyright (C) 2000-2004 MySQL AB
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation.
-
- There are special exceptions to the terms and conditions of the GPL as it
- is applied to this software. View the full text of the exception in file
- EXCEPTIONS-CLIENT in the directory of this software distribution.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-
-/*
-** Handling initialization of the dll library
-*/
-
-#include <my_global.h>
-#include <my_sys.h>
-#include <my_pthread.h>
-
-static my_bool libmysql_inited=0;
-
-void libmysql_init(void)
-{
- if (libmysql_inited)
- return;
- libmysql_inited=1;
- my_init();
- {
- DBUG_ENTER("libmysql_init");
-#ifdef LOG_ALL
- DBUG_PUSH("d:t:S:O,c::\\tmp\\libmysql.log");
-#else
- if (getenv("LIBMYSQL_LOG") != NULL)
- DBUG_PUSH(getenv("LIBMYSQL_LOG"));
-#endif
- DBUG_VOID_RETURN;
- }
-}
-
-#ifdef __WIN__
-
-static int inited=0,threads=0;
-HINSTANCE NEAR s_hModule; /* Saved module handle */
-DWORD main_thread;
-
-BOOL APIENTRY LibMain(HANDLE hInst,DWORD ul_reason_being_called,
- LPVOID lpReserved)
-{
- switch (ul_reason_being_called) {
- case DLL_PROCESS_ATTACH: /* case of libentry call in win 3.x */
- if (!inited++)
- {
- s_hModule=hInst;
- libmysql_init();
- main_thread=GetCurrentThreadId();
- }
- break;
- case DLL_THREAD_ATTACH:
- threads++;
- my_thread_init();
- break;
- case DLL_PROCESS_DETACH: /* case of wep call in win 3.x */
- if (!--inited) /* Safety */
- {
- /* my_thread_init() */ /* This may give extra safety */
- my_end(0);
- }
- break;
- case DLL_THREAD_DETACH:
- /* Main thread will free by my_end() */
- threads--;
- if (main_thread != GetCurrentThreadId())
- my_thread_end();
- break;
- default:
- break;
- } /* switch */
-
- return TRUE;
-
- UNREFERENCED_PARAMETER(lpReserved);
-} /* LibMain */
-
-
-static BOOL do_libmain;
-int __stdcall DllMain(HANDLE hInst,DWORD ul_reason_being_called,LPVOID lpReserved)
-{
- /*
- Unless environment variable LIBMYSQL_DLLINIT is set, do nothing.
- The environment variable is checked once, during the first call to DllMain()
- (in DLL_PROCESS_ATTACH hook).
- */
- if (ul_reason_being_called == DLL_PROCESS_ATTACH)
- do_libmain = (getenv("LIBMYSQL_DLLINIT") != NULL);
- if (do_libmain)
- return LibMain(hInst,ul_reason_being_called,lpReserved);
- return TRUE;
-}
-
-#elif defined(WINDOWS)
-
-/****************************************************************************
-** This routine is called by LIBSTART.ASM at module load time. All it
-** does in this sample is remember the DLL module handle. The module
-** handle is needed if you want to do things like load stuff from the
-** resource file (for instance string resources).
-****************************************************************************/
-
-int _export FAR PASCAL libmain(HANDLE hModule,short cbHeapSize,
- UCHAR FAR *lszCmdLine)
-{
- s_hModule = hModule;
- libmysql_init();
- return TRUE;
-}
-
-#endif
=== modified file 'libmysqld/CMakeLists.txt'
--- a/libmysqld/CMakeLists.txt 2008-08-08 01:33:43 +0000
+++ b/libmysqld/CMakeLists.txt 2008-09-23 14:33:18 +0000
@@ -204,6 +204,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libm
../sql/ddl_blocker.cc ../sql/si_objects.cc
../sql/event_parse_data.cc ../sql/mdl.cc
../sql/transaction.cc
+ ../sql/rpl_handler.cc
${GEN_SOURCES}
${LIB_SOURCES})
=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am 2008-08-08 01:33:43 +0000
+++ b/libmysqld/Makefile.am 2008-09-23 14:33:18 +0000
@@ -81,7 +81,8 @@ sqlsources = derror.cc field.cc field_co
debug_sync.cc sql_tablespace.cc transaction.cc \
rpl_injector.cc my_user.c partition_info.cc \
sql_servers.cc ddl_blocker.cc si_objects.cc sql_audit.cc \
- event_parse_data.cc mdl.cc
+ event_parse_data.cc mdl.cc \
+ rpl_handler.cc
libmysqld_int_a_SOURCES= $(libmysqld_sources)
nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources)
=== renamed file 'mysql-test/suite/falcon_team/r/falcon_bug_23945.result' => 'mysql-test/suite/falcon/r/falcon_bug_23945.result'
--- a/mysql-test/suite/falcon_team/r/falcon_bug_23945.result 2008-09-09 08:30:18 +0000
+++ b/mysql-test/suite/falcon/r/falcon_bug_23945.result 2008-10-02 10:37:27 +0000
@@ -15,3 +15,5 @@ SELECT * FROM t1;
ERROR 42S02: Table 'test.t1' doesn't exist
DROP TABLE t1;
ERROR 42S02: Unknown table 't1'
+CREATE TABLE t1(a INT);
+DROP TABLE t1;
=== modified file 'mysql-test/suite/falcon/r/falcon_bug_34890.result'
--- a/mysql-test/suite/falcon/r/falcon_bug_34890.result 2008-03-27 16:50:16 +0000
+++ b/mysql-test/suite/falcon/r/falcon_bug_34890.result 2008-10-06 19:17:21 +0000
@@ -16,7 +16,7 @@ CREATE TABLE t1 (
t1_autoinc INTEGER NOT NULL AUTO_INCREMENT,
t1_uuid CHAR(36),
PRIMARY KEY (t1_autoinc)
-) ENGINE = Falcon;
+);
CREATE PROCEDURE p1 ()
begin
DECLARE my_count INT DEFAULT 0;
=== renamed file 'mysql-test/suite/falcon_team/r/falcon_bug_34892.result' => 'mysql-test/suite/falcon/r/falcon_bug_34892.result'
=== modified file 'mysql-test/suite/falcon/r/falcon_online_index.result'
--- a/mysql-test/suite/falcon/r/falcon_online_index.result 2008-09-10 15:08:56 +0000
+++ b/mysql-test/suite/falcon/r/falcon_online_index.result 2008-10-03 05:15:40 +0000
@@ -72,14 +72,14 @@ affected rows: 0
info: Records: 0 Duplicates: 0 Warnings: 0
#-------- Testing implicit OFFLINE --------#
ALTER TABLE t3 ADD INDEX ix_c (c);
-affected rows: 1000
-info: Records: 1000 Duplicates: 0 Warnings: 0
+affected rows: 0
+info: Records: 0 Duplicates: 0 Warnings: 0
DROP INDEX ix_c ON t3;
affected rows: 0
info: Records: 0 Duplicates: 0 Warnings: 0
ALTER TABLE t3 ADD INDEX ix_cd (c, d);
-affected rows: 1000
-info: Records: 1000 Duplicates: 0 Warnings: 0
+affected rows: 0
+info: Records: 0 Duplicates: 0 Warnings: 0
DROP INDEX ix_cd ON t3;
affected rows: 0
info: Records: 0 Duplicates: 0 Warnings: 0
@@ -108,10 +108,8 @@ a b c d e
ALTER ONLINE TABLE t3 DROP INDEX ix_b;
#-------- ONLINE: ALTER ADD not-null with default --------#
ALTER ONLINE TABLE t3 ADD INDEX ix_c (c);
-ERROR 42000: This version of MySQL doesn't yet support 'ALTER ONLINE TABLE t3 ADD INDEX ix_c (c)'
#-------- ONLINE: ALTER ADD not-null --------#
ALTER ONLINE TABLE t3 ADD INDEX ix_d (d);
-ERROR 42000: This version of MySQL doesn't yet support 'ALTER ONLINE TABLE t3 ADD INDEX ix_d (d)'
#-------- ONLINE: ALTER ADD same key multiple times --------#
ALTER ONLINE TABLE t1 ADD INDEX index_c (c);
ALTER ONLINE TABLE t1 ADD INDEX index_c (c);
@@ -171,6 +169,8 @@ ALTER ONLINE TABLE t3 ADD INDEX ix_asc_b
SHOW INDEXES FROM t3;
Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment Index_Comment
t3 0 PRIMARY 1 a NULL 500 NULL NULL BTREE
+t3 1 ix_c 1 c NULL 500 NULL NULL BTREE
+t3 1 ix_d 1 d NULL 500 NULL NULL BTREE
t3 1 ix_desc_b 1 b NULL 500 NULL NULL YES BTREE
t3 1 ix_asc_b 1 b NULL 500 NULL NULL YES BTREE
DROP ONLINE INDEX ix_desc_b ON t3;
@@ -265,6 +265,8 @@ t1 1 index_int 1 c NULL 10 NULL NULL YES
SHOW INDEXES FROM t3;
Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment Index_Comment
t3 0 PRIMARY 1 a NULL 500 NULL NULL BTREE
+t3 1 ix_c 1 c NULL 500 NULL NULL BTREE
+t3 1 ix_d 1 d NULL 500 NULL NULL BTREE
t3 1 index_int 1 b NULL 500 NULL NULL YES BTREE
t3 1 index_multi 1 b NULL 250 NULL NULL YES BTREE
t3 1 index_multi 2 e NULL 500 NULL NULL YES BTREE
@@ -283,6 +285,8 @@ t1 0 PRIMARY 1 a NULL 10 NULL NULL BTRE
SHOW INDEXES FROM t3;
Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment Index_Comment
t3 0 PRIMARY 1 a NULL 500 NULL NULL BTREE
+t3 1 ix_c 1 c NULL 500 NULL NULL BTREE
+t3 1 ix_d 1 d NULL 500 NULL NULL BTREE
#-------- Test: Combined ADD/DROP INDEX in a single statement --------#
ALTER TABLE t1 ADD INDEX index_int (c);
ALTER TABLE t1 ADD INDEX index_char (d), DROP INDEX index_int;
@@ -344,7 +348,7 @@ a b c d a b c d
16 TestRow16 32 Char16 31 62 32 SomeString 31 for testing
EXPLAIN SELECT * FROM t1, t3 WHERE t3.b=2 AND (t1.c = t3.c OR t1.a=t3.d);
id select_type table type possible_keys key key_len ref rows Extra
-1 SIMPLE t3 ref ix_b ix_b 5 const 100
+1 SIMPLE t3 ref ix_c,ix_d,ix_b ix_b 5 const 100
1 SIMPLE t1 ALL PRIMARY,ix_a,ix_c NULL NULL NULL 20 Range checked for each record (index map: 0xB)
SELECT * FROM t1, t3 WHERE t3.b=2 AND (t1.c = t3.c OR t1.a=t3.d);
a b c d a b c d e
@@ -367,6 +371,8 @@ t2 0 PRIMARY 1 a NULL 16 NULL NULL BTRE
SHOW INDEXES FROM t3;
Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment Index_Comment
t3 0 PRIMARY 1 a NULL 500 NULL NULL BTREE
+t3 1 ix_c 1 c NULL 500 NULL NULL BTREE
+t3 1 ix_d 1 d NULL 500 NULL NULL BTREE
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE t3;
=== renamed file 'mysql-test/suite/falcon_team/t/falcon_bug_23945.test' => 'mysql-test/suite/falcon/t/falcon_bug_23945.test'
--- a/mysql-test/suite/falcon_team/t/falcon_bug_23945.test 2008-09-09 08:30:18 +0000
+++ b/mysql-test/suite/falcon/t/falcon_bug_23945.test 2008-10-02 10:37:27 +0000
@@ -36,6 +36,9 @@ SELECT * FROM t1;
--error ER_BAD_TABLE_ERROR
DROP TABLE t1;
+CREATE TABLE t1(a INT);
+DROP TABLE t1;
+
# ----------------------------------------------------- #
# --- Check --- #
# ----------------------------------------------------- #
=== modified file 'mysql-test/suite/falcon/t/falcon_bug_34890.test'
--- a/mysql-test/suite/falcon/t/falcon_bug_34890.test 2008-03-27 16:50:16 +0000
+++ b/mysql-test/suite/falcon/t/falcon_bug_34890.test 2008-10-06 19:17:21 +0000
@@ -1,9 +1,10 @@
--source include/have_falcon.inc
+
#
# Bug #34890: Update Conflict on non-overlapping transactions
-# This test works because it uses FALCON_CONSISTENT_READ=OFF
-# This test is different from 34351_C in that there is no index
-# on t1_uuid.
+# This test works because it uses FALCON_CONSISTENT_READ=OFF
+# This test is different from 34351_C in that there is no index
+# on t1_uuid.
#
--echo *** Bug #34890 ***
@@ -42,9 +43,7 @@ CREATE TABLE t1 (
t1_autoinc INTEGER NOT NULL AUTO_INCREMENT,
t1_uuid CHAR(36),
PRIMARY KEY (t1_autoinc)
-) ENGINE = Falcon;
-
-# declare continue handler for sqlexception
+);
delimiter //;
CREATE PROCEDURE p1 ()
@@ -64,7 +63,6 @@ delimiter ;//
# ----------------------------------------------------- #
# --- Test --- #
# ----------------------------------------------------- #
-
--echo # Switch to connection conn1
connection conn1;
--echo # Send call p1() to the server but do not pull the results
@@ -110,11 +108,9 @@ connection conn4;
--echo # Pull the results of the preceeding call p1() by conn4
--reap
-
--echo # Switch to connection default
connection default;
-
# ----------------------------------------------------- #
# --- Check --- #
# ----------------------------------------------------- #
=== renamed file 'mysql-test/suite/falcon_team/t/falcon_bug_34892.test' => 'mysql-test/suite/falcon/t/falcon_bug_34892.test'
--- a/mysql-test/suite/falcon_team/t/falcon_bug_34892.test 2008-04-20 00:05:17 +0000
+++ b/mysql-test/suite/falcon/t/falcon_bug_34892.test 2008-10-06 19:17:21 +0000
@@ -1,6 +1,7 @@
--source include/have_falcon.inc
+
#
-# Bug #34892: Some falcon tests fail sporadically
+# Bug #34892: Transaction handling in select_create::abort let's Falcon fail
#
--echo *** Bug #34892 ***
@@ -21,8 +22,6 @@ SET @@autocommit = 0;
--error ER_DUP_ENTRY
CREATE TABLE t1 (PRIMARY KEY (a)) SELECT 1 AS a UNION ALL SELECT 1;
-# This CREATE TABLE provokes a
-# 1005: Can't create table 'test.t1' (errno: 156).
CREATE TABLE t1 (a int);
# ----------------------------------------------------- #
=== modified file 'mysql-test/suite/falcon/t/falcon_online_index.test'
--- a/mysql-test/suite/falcon/t/falcon_online_index.test 2008-09-10 15:08:56 +0000
+++ b/mysql-test/suite/falcon/t/falcon_online_index.test 2008-10-03 05:15:40 +0000
@@ -163,14 +163,12 @@ ALTER ONLINE TABLE t3 DROP INDEX ix_b;
--echo #-------- ONLINE: ALTER ADD not-null with default --------#
-# Test that ALTER ONLINE ... ADD INDEX fails with ONLINE for non-nullable column having default value
---error ER_NOT_SUPPORTED_YET
+# Test that ALTER ONLINE ... ADD INDEX succeeds with ONLINE for non-nullable column having default value
ALTER ONLINE TABLE t3 ADD INDEX ix_c (c);
--echo #-------- ONLINE: ALTER ADD not-null --------#
-# Test that ALTER ONLINE ... ADD INDEX fails with ONLINE for non-nullable columns
---error ER_NOT_SUPPORTED_YET
+# Test that ALTER ONLINE ... ADD INDEX succeeds with ONLINE for non-nullable columns
ALTER ONLINE TABLE t3 ADD INDEX ix_d (d);
--echo #-------- ONLINE: ALTER ADD same key multiple times --------#
=== modified file 'mysql-test/suite/falcon_team/t/disabled.def'
--- a/mysql-test/suite/falcon_team/t/disabled.def 2008-09-10 22:35:51 +0000
+++ b/mysql-test/suite/falcon_team/t/disabled.def 2008-10-02 10:37:27 +0000
@@ -17,4 +17,3 @@
# which should probably be attached to a bug report instead.
# Also please keep the list sorted.
-falcon_bug_23945 : Bug#34892 2008-09-10 hky Test failure brings Falcon's data dictionary out of sync
=== modified file 'mysql-test/suite/falcon_team/t/test2bug.def'
--- a/mysql-test/suite/falcon_team/t/test2bug.def 2008-09-28 07:16:36 +0000
+++ b/mysql-test/suite/falcon_team/t/test2bug.def 2008-10-02 10:37:27 +0000
@@ -20,5 +20,4 @@
falcon_bug_23945: Bug#34892 - Transaction handling in select_create::abort let's Falcon fail
falcon_bug_26433: Bug#39314 - falcon_bug_26433 fails with an offset of 1 in row numbers in expected warnings
falcon_bug_28048: Bug#36700 - Running falcon_bug_28048 shows increasing memory usage and run time
-falcon_bug_34892: Bug#34892 - Transaction handling in select_create::abort let's Falcon fail
falcon_deadlock: Bug#34182 - SELECT ... FOR UPDATE does not lock when in subquery
=== modified file 'sql/CMakeLists.txt'
--- a/sql/CMakeLists.txt 2008-09-16 08:34:30 +0000
+++ b/sql/CMakeLists.txt 2008-09-26 16:30:56 +0000
@@ -78,6 +78,7 @@ ADD_EXECUTABLE(mysqld
sql_connect.cc scheduler.cc transaction.cc
ddl_blocker.cc si_objects.cc si_logs.cc
sql_profile.cc event_parse_data.cc mdl.cc
+ rpl_handler.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.h
${PROJECT_SOURCE_DIR}/include/mysqld_error.h
=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am 2008-09-30 05:24:12 +0000
+++ b/sql/Makefile.am 2008-10-01 12:09:26 +0000
@@ -89,7 +89,8 @@ noinst_HEADERS = item.h item_func.h item
sql_partition.h partition_info.h partition_element.h \
probes.h sql_audit.h transaction.h \
contributors.h sql_servers.h ddl_blocker.h \
- si_objects.h si_logs.h sql_plist.h mdl.h records.h
+ si_objects.h si_logs.h sql_plist.h mdl.h records.h \
+ rpl_handler.h replication.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -136,7 +137,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.
sql_builtin.cc sql_tablespace.cc partition_info.cc \
sql_servers.cc sql_audit.cc sha2.cc \
ddl_blocker.cc si_objects.cc si_logs.cc \
- event_parse_data.cc mdl.cc transaction.cc
+ event_parse_data.cc mdl.cc transaction.cc \
+ rpl_handler.cc
if HAVE_DTRACE
mysqld_SOURCES += probes.d
=== modified file 'sql/log.cc'
--- a/sql/log.cc 2008-10-06 17:04:15 +0000
+++ b/sql/log.cc 2008-10-07 17:04:28 +0000
@@ -39,6 +39,7 @@
#endif
#include <mysql/plugin.h>
+#include "rpl_handler.h"
#include "si_logs.h"
/* max size of the log message */
@@ -4895,9 +4896,11 @@ err:
}
-bool MYSQL_BIN_LOG::flush_and_sync()
+bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
{
int err=0, fd=log_file.file;
+ if (synced)
+ *synced= 0;
safe_mutex_assert_owner(&LOCK_log);
if (flush_io_cache(&log_file))
return 1;
@@ -4905,6 +4908,8 @@ bool MYSQL_BIN_LOG::flush_and_sync()
{
sync_binlog_counter= 0;
err=my_sync(fd, MYF(MY_WME));
+ if (synced)
+ *synced= 1;
}
return err;
}
@@ -5194,7 +5199,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_row
if (file == &log_file)
{
- error= flush_and_sync();
+ error= flush_and_sync(0);
if (!error)
{
signal_update();
@@ -5377,8 +5382,15 @@ bool MYSQL_BIN_LOG::write(Log_event *eve
if (file == &log_file) // we are writing to the real log (disk)
{
- if (flush_and_sync())
+ bool synced;
+ if (flush_and_sync(&synced))
goto err;
+
+ if (RUN_HOOK(binlog_storage, after_flush,
+ (thd, log_file_name, file->pos_in_file, synced))) {
+ goto err;
+ }
+
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
@@ -5639,7 +5651,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE
DBUG_ASSERT(carry == 0);
if (sync_log)
- flush_and_sync();
+ flush_and_sync(0);
return 0; // All OK
}
@@ -5725,7 +5737,9 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
if (commit_event && commit_event->write(&log_file))
goto err;
- if (flush_and_sync())
+
+ bool synced;
+ if (flush_and_sync(&synced))
goto err;
DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_ABORT(););
if (cache->error) // Error on read
@@ -5734,6 +5748,11 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
write_error=1; // Don't give more errors
goto err;
}
+
+ if (RUN_HOOK(binlog_storage, after_flush,
+ (thd, log_file_name, log_file.pos_in_file, synced)))
+ goto err;
+
signal_update();
}
=== modified file 'sql/log.h'
--- a/sql/log.h 2008-10-01 12:02:28 +0000
+++ b/sql/log.h 2008-10-07 17:04:28 +0000
@@ -459,7 +459,21 @@ public:
bool is_active(const char* log_file_name);
int update_log_index(LOG_INFO* linfo, bool need_update_threads);
void rotate_and_purge(uint flags);
- bool flush_and_sync();
+
+ /**
+ Flush binlog cache and synchronize to disk.
+
+ This function flushes events in binlog cache to binary log file,
+ it will do synchronizing according to the setting of system
+ variable 'sync_binlog'. If file is synchronized, @c synced will
+ be set to 1, otherwise 0.
+
+ @param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0
+
+ @retval 0 Success
+ @retval other Failure
+ */
+ bool flush_and_sync(bool *synced);
int purge_logs(const char *to_log, bool included,
bool need_mutex, bool need_update_threads,
ulonglong *decrease_log_space);
=== modified file 'sql/mysql_priv.h'
--- a/sql/mysql_priv.h 2008-10-02 14:03:57 +0000
+++ b/sql/mysql_priv.h 2008-10-07 17:04:28 +0000
@@ -140,7 +140,7 @@ char* query_table_status(THD *thd,const
#define WARN_DEPRECATED(Thd,VerHi,VerLo,Old,New) \
do { \
compile_time_assert(MYSQL_VERSION_ID < VerHi * 10000 + VerLo * 100); \
- if (Thd) \
+ if ((Thd) != NULL) \
push_warning_printf((Thd), MYSQL_ERROR::WARN_LEVEL_WARN, \
ER_WARN_DEPRECATED_SYNTAX, \
ER(ER_WARN_DEPRECATED_SYNTAX_WITH_VER), \
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2008-10-07 13:38:49 +0000
+++ b/sql/mysqld.cc 2008-10-07 17:04:28 +0000
@@ -33,6 +33,8 @@
#include "rpl_injector.h"
+#include "rpl_handler.h"
+
#ifdef HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
#endif
@@ -1352,6 +1354,7 @@ void clean_up(bool print_message)
ha_end();
if (tc_log)
tc_log->close();
+ delegates_destroy();
xid_cache_free();
delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
multi_keycache_free();
@@ -3946,6 +3949,13 @@ static int init_server_components()
unireg_abort(1);
}
+ /* initialize delegates for extension observers */
+ if (delegates_init())
+ {
+ sql_print_error("Initialize extension delegates failed");
+ unireg_abort(1);
+ }
+
/* need to configure logging before initializing storage engines */
if (opt_update_log)
{
=== added file 'sql/replication.h'
--- a/sql/replication.h 1970-01-01 00:00:00 +0000
+++ b/sql/replication.h 2008-09-23 15:06:18 +0000
@@ -0,0 +1,461 @@
+/* Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef REPLICATION_H
+#define REPLICATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ Transaction observer flags.
+*/
+enum Trans_flags {
+ /** Transaction is a real transaction */
+ TRANS_IS_REAL_TRANS = 1
+};
+
+/**
+ Transaction observer parameter
+*/
+typedef struct Trans_param {
+ uint32 server_id;
+ uint32 flags;
+
+ /*
+ The latest binary log file name and position written by current
+ transaction, if binary log is disabled or no log event has been
+ written into binary log file by current transaction (events
+ written into transaction log cache are not counted), these two
+ member will be zero.
+ */
+ const char *log_file;
+ my_off_t log_pos;
+} Trans_param;
+
+/**
+ Observes and extends transaction execution
+*/
+typedef struct Trans_observer {
+ uint32 len;
+
+ /**
+ This callback is called after transaction commit
+
+ This callback is called right after commit to storage engines for
+ transactional tables.
+
+ For non-transactional tables, this is called at the end of the
+ statement, before sending statement status, if the statement
+ succeeded.
+
+ @note The return value is currently ignored by the server.
+
+ @param param The parameter for transaction observers
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_commit)(Trans_param *param);
+
+ /**
+ This callback is called after transaction rollback
+
+ This callback is called right after rollback to storage engines
+ for transactional tables.
+
+ For non-transactional tables, this is called at the end of the
+ statement, before sending statement status, if the statement
+ failed.
+
+ @note The return value is currently ignored by the server.
+
+ @param param The parameter for transaction observers
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_rollback)(Trans_param *param);
+} Trans_observer;
+
+/**
+ Binlog storage flags
+*/
+enum Binlog_storage_flags {
+ /** Binary log was sync:ed */
+ BINLOG_STORAGE_IS_SYNCED = 1
+};
+
+/**
+ Binlog storage observer parameters
+ */
+typedef struct Binlog_storage_param {
+ uint32 server_id;
+} Binlog_storage_param;
+
+/**
+ Observe binlog logging storage
+*/
+typedef struct Binlog_storage_observer {
+ uint32 len;
+
+ /**
+ This callback is called after binlog has been flushed
+
+ This callback is called after cached events have been flushed to
+ binary log file. Whether the binary log file is synchronized to
+ disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags.
+
+ @param param Observer common parameter
+ @param log_file Binlog file name been updated
+ @param log_pos Binlog position after update
+ @param flags flags for binlog storage
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_flush)(Binlog_storage_param *param,
+ const char *log_file, my_off_t log_pos,
+ uint32 flags);
+} Binlog_storage_observer;
+
+/**
+ Replication binlog transmitter (binlog dump) observer parameter.
+*/
+typedef struct Binlog_transmit_param {
+ uint32 server_id;
+ uint32 flags;
+} Binlog_transmit_param;
+
+/**
+ Observe and extends the binlog dumping thread.
+*/
+typedef struct Binlog_transmit_observer {
+ uint32 len;
+
+ /**
+ This callback is called when binlog dumping starts
+
+
+ @param param Observer common parameter
+ @param log_file Binlog file name to transmit from
+ @param log_pos Binlog position to transmit from
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*transmit_start)(Binlog_transmit_param *param,
+ const char *log_file, my_off_t log_pos);
+
+ /**
+ This callback is called when binlog dumping stops
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*transmit_stop)(Binlog_transmit_param *param);
+
+ /**
+ This callback is called to reserve bytes in packet header for event transmission
+
+ This callback is called when resetting transmit packet header to
+ reserve bytes for this observer in packet header.
+
+ The @a header buffer is allocated by the server code, and @a size
+ is the size of the header buffer. Each observer can only reserve
+ a maximum size of @a size in the header.
+
+ @param param Observer common parameter
+ @param header Pointer of the header buffer
+ @param size Size of the header buffer
+ @param len Header length reserved by this observer
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*reserve_header)(Binlog_transmit_param *param,
+ unsigned char *header,
+ unsigned long size,
+ unsigned long *len);
+
+ /**
+ This callback is called before sending an event packet to slave
+
+ @param param Observer common parameter
+ @param packet Binlog event packet to send
+ @param len Length of the event packet
+ @param log_file Binlog file name of the event packet to send
+ @param log_pos Binlog position of the event packet to send
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*before_send_event)(Binlog_transmit_param *param,
+ unsigned char *packet, unsigned long len,
+ const char *log_file, my_off_t log_pos );
+
+ /**
+ This callback is called after sending an event packet to slave
+
+ @param param Observer common parameter
+ @param event_buf Binlog event packet buffer sent
+ @param len length of the event packet buffer
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_send_event)(Binlog_transmit_param *param,
+ const char *event_buf, unsigned long len);
+
+ /**
+ This callback is called after resetting master status
+
+ This is called when executing the command RESET MASTER, and is
+ used to reset status variables added by observers.
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_reset_master)(Binlog_transmit_param *param);
+} Binlog_transmit_observer;
+
+/**
+ Binlog relay IO flags
+*/
+enum Binlog_relay_IO_flags {
+ /** Binary relay log was sync:ed */
+ BINLOG_RELAY_IS_SYNCED = 1
+};
+
+
+/**
+ Replication binlog relay IO observer parameter
+*/
+typedef struct Binlog_relay_IO_param {
+ uint32 server_id;
+
+ /* Master host, user and port */
+ char *host;
+ char *user;
+ unsigned int port;
+
+ char *master_log_name;
+ my_off_t master_log_pos;
+
+ MYSQL *mysql; /* the connection to master */
+} Binlog_relay_IO_param;
+
+/**
+ Observes and extends the service of slave IO thread.
+*/
+typedef struct Binlog_relay_IO_observer {
+ uint32 len;
+
+ /**
+ This callback is called when slave IO thread starts
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*thread_start)(Binlog_relay_IO_param *param);
+
+ /**
+ This callback is called when slave IO thread stops
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*thread_stop)(Binlog_relay_IO_param *param);
+
+ /**
+ This callback is called before slave requesting binlog transmission from master
+
+ This is called before slave issuing BINLOG_DUMP command to master
+ to request binlog.
+
+ @param param Observer common parameter
+ @param flags binlog dump flags
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);
+
+ /**
+ This callback is called after read an event packet from master
+
+ @param param Observer common parameter
+ @param packet The event packet read from master
+ @param len Length of the event packet read from master
+ @param event_buf The event packet return after process
+ @param event_len The length of event packet return after process
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_read_event)(Binlog_relay_IO_param *param,
+ const char *packet, unsigned long len,
+ const char **event_buf, unsigned long *event_len);
+
+ /**
+ This callback is called after written an event packet to relay log
+
+ @param param Observer common parameter
+ @param event_buf Event packet written to relay log
+ @param event_len Length of the event packet written to relay log
+ @param flags flags for relay log
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_queue_event)(Binlog_relay_IO_param *param,
+ const char *event_buf, unsigned long event_len,
+ uint32 flags);
+
+ /**
+ This callback is called after reset slave relay log IO status
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_reset_slave)(Binlog_relay_IO_param *param);
+} Binlog_relay_IO_observer;
+
+
+/**
+ Register a transaction observer
+
+ @param observer The transaction observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_trans_observer(Trans_observer *observer, void *p);
+
+/**
+ Unregister a transaction observer
+
+ @param observer The transaction observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_trans_observer(Trans_observer *observer, void *p);
+
+/**
+ Register a binlog storage observer
+
+ @param observer The binlog storage observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+ Unregister a binlog storage observer
+
+ @param observer The binlog storage observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+ Register a binlog transmit observer
+
+ @param observer The binlog transmit observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+ Unregister a binlog transmit observer
+
+ @param observer The binlog transmit observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+ Register a binlog relay IO (slave IO thread) observer
+
+ @param observer The binlog relay IO observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+ Unregister a binlog relay IO (slave IO thread) observer
+
+ @param observer The binlog relay IO observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+ Connect to master
+
+ This function can only used in the slave I/O thread context, and
+ will use the same master information to do the connection.
+
+ @code
+ MYSQL *mysql = mysql_init(NULL);
+ if (rpl_connect_master(mysql))
+ {
+ // do stuff with the connection
+ }
+ mysql_close(mysql); // close the connection
+ @endcode
+
+ @param mysql address of MYSQL structure to use, pass NULL will
+ create a new one
+
+ @return address of MYSQL structure on success, NULL on failure
+*/
+MYSQL *rpl_connect_master(MYSQL *mysql);
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* REPLICATION_H */
=== added file 'sql/rpl_handler.cc'
--- a/sql/rpl_handler.cc 1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.cc 2008-09-23 15:06:18 +0000
@@ -0,0 +1,485 @@
+/* Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "mysql_priv.h"
+
+#include "rpl_mi.h"
+#include "sql_repl.h"
+#include "log_event.h"
+#include "rpl_filter.h"
+#include <my_dir.h>
+#include "rpl_handler.h"
+
+Trans_delegate *transaction_delegate;
+Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+Binlog_transmit_delegate *binlog_transmit_delegate;
+Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+ structure to save transaction log filename and position
+*/
+typedef struct Trans_binlog_info {
+ my_off_t log_pos;
+ char log_file[FN_REFLEN];
+} Trans_binlog_info;
+
+static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+int get_user_var_int(const char *name,
+ long long int *value, int *null_value)
+{
+ my_bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(¤t_thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (!entry)
+ return 1;
+ *value= entry->val_int(&null_val);
+ if (null_value)
+ *null_value= null_val;
+ return 0;
+}
+
+int get_user_var_real(const char *name,
+ double *value, int *null_value)
+{
+ my_bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(¤t_thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (!entry)
+ return 1;
+ *value= entry->val_real(&null_val);
+ if (null_value)
+ *null_value= null_val;
+ return 0;
+}
+
+int get_user_var_str(const char *name, char *value,
+ size_t len, unsigned int precision, int *null_value)
+{
+ String str;
+ my_bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(¤t_thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (!entry)
+ return 1;
+ entry->val_str(&null_val, &str, precision);
+ strncpy(value, str.c_ptr(), len);
+ if (null_value)
+ *null_value= null_val;
+ return 0;
+}
+
+int delegates_init()
+{
+ static unsigned char trans_mem[sizeof(Trans_delegate)];
+ static unsigned char storage_mem[sizeof(Binlog_storage_delegate)];
+#ifdef HAVE_REPLICATION
+ static unsigned char transmit_mem[sizeof(Binlog_transmit_delegate)];
+ static unsigned char relay_io_mem[sizeof(Binlog_relay_IO_delegate)];
+#endif
+
+ if (!(transaction_delegate= new (trans_mem) Trans_delegate)
+ || (!transaction_delegate->is_inited())
+ || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate)
+ || (!binlog_storage_delegate->is_inited())
+#ifdef HAVE_REPLICATION
+ || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate)
+ || (!binlog_transmit_delegate->is_inited())
+ || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate)
+ || (!binlog_relay_io_delegate->is_inited())
+#endif /* HAVE_REPLICATION */
+ )
+ return 1;
+
+ if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
+ return 1;
+ return 0;
+}
+
+void delegates_destroy()
+{
+ transaction_delegate->~Trans_delegate();
+ binlog_storage_delegate->~Binlog_storage_delegate();
+#ifdef HAVE_REPLICATION
+ binlog_transmit_delegate->~Binlog_transmit_delegate();
+ binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
+#endif /* HAVE_REPLICATION */
+}
+
+/*
+ This macro is used by almost all the Delegate methods to iterate
+ over all the observers running given callback function of the
+ delegate .
+
+ Add observer plugins to the thd->lex list, after each statement, all
+ plugins add to thd->lex will be automatically unlocked.
+ */
+#define FOREACH_OBSERVER(r, f, thd, args) \
+ param.server_id= thd->server_id; \
+ read_lock(); \
+ Observer_info_iterator iter= observer_info_iter(); \
+ Observer_info *info= iter++; \
+ for (; info; info= iter++) \
+ { \
+ plugin_ref plugin= \
+ my_plugin_lock(thd, &info->plugin); \
+ if (!plugin) \
+ { \
+ r= 1; \
+ break; \
+ } \
+ if (((Observer *)info->observer)->f \
+ && ((Observer *)info->observer)->f args) \
+ { \
+ r= 1; \
+ break; \
+ } \
+ } \
+ unlock()
+
+
+int Trans_delegate::after_commit(THD *thd, bool all)
+{
+ Trans_param param;
+ bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+ if (is_real_trans)
+ param.flags |= TRANS_IS_REAL_TRANS;
+
+ Trans_binlog_info *log_info=
+ my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+ param.log_file= log_info ? log_info->log_file : 0;
+ param.log_pos= log_info ? log_info->log_pos : 0;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
+
+ /*
+ This is the end of a real transaction or autocommit statement, we
+ can free the memory allocated for binlog file and position.
+ */
+ if (is_real_trans && log_info)
+ {
+ my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+ my_free(log_info, MYF(0));
+ }
+ return ret;
+}
+
+int Trans_delegate::after_rollback(THD *thd, bool all)
+{
+ Trans_param param;
+ bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+ if (is_real_trans)
+ param.flags |= TRANS_IS_REAL_TRANS;
+
+ Trans_binlog_info *log_info=
+ my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+ param.log_file= log_info ? log_info->log_file : 0;
+ param.log_pos= log_info ? log_info->log_pos : 0;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
+
+ /*
+ This is the end of a real transaction or autocommit statement, we
+ can free the memory allocated for binlog file and position.
+ */
+ if (is_real_trans && log_info)
+ {
+ my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+ my_free(log_info, MYF(0));
+ }
+ return ret;
+}
+
+int Binlog_storage_delegate::after_flush(THD *thd,
+ const char *log_file,
+ my_off_t log_pos,
+ bool synced)
+{
+ Binlog_storage_param param;
+ uint32 flags=0;
+ if (synced)
+ flags |= BINLOG_STORAGE_IS_SYNCED;
+
+ Trans_binlog_info *log_info=
+ my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+ if (!log_info)
+ {
+ if(!(log_info=
+ (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
+ return 1;
+ my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
+ }
+
+ strcpy(log_info->log_file, log_file+dirname_length(log_file));
+ log_info->log_pos = log_pos;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_flush, thd,
+ (¶m, log_info->log_file, log_info->log_pos, flags));
+ return ret;
+}
+
+#ifdef HAVE_REPLICATION
+int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
+ const char *log_file,
+ my_off_t log_pos)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos));
+ return ret;
+}
+
+int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m));
+ return ret;
+}
+
+int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
+ String *packet)
+{
+ /* NOTE2ME: Maximum extra header size for each observer, I hope 32
+ bytes should be enough for each Observer to reserve their extra
+ header. If later found this is not enough, we can increase this
+ /HEZX
+ */
+#define RESERVE_HEADER_SIZE 32
+ unsigned char header[RESERVE_HEADER_SIZE];
+ ulong hlen;
+ Binlog_transmit_param param;
+ param.flags= flags;
+ param.server_id= thd->server_id;
+
+ int ret= 0;
+ read_lock();
+ Observer_info_iterator iter= observer_info_iter();
+ Observer_info *info= iter++;
+ for (; info; info= iter++)
+ {
+ plugin_ref plugin=
+ my_plugin_lock(thd, &info->plugin);
+ if (!plugin)
+ {
+ ret= 1;
+ break;
+ }
+ hlen= 0;
+ if (((Observer *)info->observer)->reserve_header
+ && ((Observer *)info->observer)->reserve_header(¶m,
+ header,
+ RESERVE_HEADER_SIZE,
+ &hlen))
+ {
+ ret= 1;
+ break;
+ }
+ if (hlen == 0)
+ continue;
+ if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
+ {
+ ret= 1;
+ break;
+ }
+ }
+ unlock();
+ return ret;
+}
+
+int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
+ String *packet,
+ const char *log_file,
+ my_off_t log_pos)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, before_send_event, thd,
+ (¶m, (uchar *)packet->c_ptr(),
+ packet->length(),
+ log_file+dirname_length(log_file), log_pos));
+ return ret;
+}
+
+int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
+ String *packet)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_send_event, thd,
+ (¶m, packet->c_ptr(), packet->length()));
+ return ret;
+}
+
+int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
+
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m));
+ return ret;
+}
+
+void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
+ Master_info *mi)
+{
+ param->mysql= mi->mysql;
+ param->user= mi->user;
+ param->host= mi->host;
+ param->port= mi->port;
+ param->master_log_name= mi->master_log_name;
+ param->master_log_pos= mi->master_log_pos;
+}
+
+int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
+{
+ Binlog_relay_IO_param param;
+ init_param(¶m, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, thread_start, thd, (¶m));
+ return ret;
+}
+
+
+int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
+{
+
+ Binlog_relay_IO_param param;
+ init_param(¶m, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, thread_stop, thd, (¶m));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
+ Master_info *mi,
+ ushort flags)
+{
+ Binlog_relay_IO_param param;
+ init_param(¶m, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
+ const char *packet, ulong len,
+ const char **event_buf,
+ ulong *event_len)
+{
+ Binlog_relay_IO_param param;
+ init_param(¶m, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_read_event, thd,
+ (¶m, packet, len, event_buf, event_len));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
+ const char *event_buf,
+ ulong event_len,
+ bool synced)
+{
+ Binlog_relay_IO_param param;
+ init_param(¶m, mi);
+
+ uint32 flags=0;
+ if (synced)
+ flags |= BINLOG_STORAGE_IS_SYNCED;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_queue_event, thd,
+ (¶m, event_buf, event_len, flags));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
+
+{
+ Binlog_relay_IO_param param;
+ init_param(¶m, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m));
+ return ret;
+}
+#endif /* HAVE_REPLICATION */
+
+int register_trans_observer(Trans_observer *observer, void *p)
+{
+ return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_trans_observer(Trans_observer *observer, void *p)
+{
+ return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+ return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+ return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+#ifdef HAVE_REPLICATION
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+ return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+ return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+ return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+ return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+#endif /* HAVE_REPLICATION */
=== added file 'sql/rpl_handler.h'
--- a/sql/rpl_handler.h 1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.h 2008-09-23 15:06:18 +0000
@@ -0,0 +1,213 @@
+/* Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef RPL_HANDLER_H
+#define RPL_HANDLER_H
+
+#include "mysql_priv.h"
+#include "rpl_mi.h"
+#include "rpl_rli.h"
+#include "sql_plugin.h"
+#include "replication.h"
+
+class Observer_info {
+public:
+ void *observer;
+ st_plugin_int *plugin_int;
+ plugin_ref plugin;
+
+ Observer_info(void *ob, st_plugin_int *p)
+ :observer(ob), plugin_int(p)
+ {
+ plugin= plugin_int_to_ref(plugin_int);
+ }
+};
+
+class Delegate {
+public:
+ typedef List<Observer_info> Observer_info_list;
+ typedef List_iterator<Observer_info> Observer_info_iterator;
+
+ int add_observer(void *observer, st_plugin_int *plugin)
+ {
+ int ret= FALSE;
+ if (!inited)
+ return TRUE;
+ write_lock();
+ Observer_info_iterator iter(observer_info_list);
+ Observer_info *info= iter++;
+ while (info && info->observer != observer)
+ info= iter++;
+ if (!info)
+ {
+ info= new Observer_info(observer, plugin);
+ if (!info || observer_info_list.push_back(info, &memroot))
+ ret= TRUE;
+ }
+ else
+ ret= TRUE;
+ unlock();
+ return ret;
+ }
+
+ int remove_observer(void *observer, st_plugin_int *plugin)
+ {
+ int ret= FALSE;
+ if (!inited)
+ return TRUE;
+ write_lock();
+ Observer_info_iterator iter(observer_info_list);
+ Observer_info *info= iter++;
+ while (info && info->observer != observer)
+ info= iter++;
+ if (info)
+ iter.remove();
+ else
+ ret= TRUE;
+ unlock();
+ return ret;
+ }
+
+ inline Observer_info_iterator observer_info_iter()
+ {
+ return Observer_info_iterator(observer_info_list);
+ }
+
+ inline bool is_empty()
+ {
+ return observer_info_list.is_empty();
+ }
+
+ inline int read_lock()
+ {
+ if (!inited)
+ return TRUE;
+ return rw_rdlock(&lock);
+ }
+
+ inline int write_lock()
+ {
+ if (!inited)
+ return TRUE;
+ return rw_wrlock(&lock);
+ }
+
+ inline int unlock()
+ {
+ if (!inited)
+ return TRUE;
+ return rw_unlock(&lock);
+ }
+
+ inline bool is_inited()
+ {
+ return inited;
+ }
+
+ Delegate()
+ {
+ inited= FALSE;
+ if (my_rwlock_init(&lock, NULL))
+ return;
+ init_sql_alloc(&memroot, 1024, 0);
+ inited= TRUE;
+ }
+ ~Delegate()
+ {
+ inited= FALSE;
+ rwlock_destroy(&lock);
+ free_root(&memroot, MYF(0));
+ }
+
+private:
+ Observer_info_list observer_info_list;
+ rw_lock_t lock;
+ MEM_ROOT memroot;
+ bool inited;
+};
+
+class Trans_delegate
+ :public Delegate {
+public:
+ typedef Trans_observer Observer;
+ int before_commit(THD *thd, bool all);
+ int before_rollback(THD *thd, bool all);
+ int after_commit(THD *thd, bool all);
+ int after_rollback(THD *thd, bool all);
+};
+
+class Binlog_storage_delegate
+ :public Delegate {
+public:
+ typedef Binlog_storage_observer Observer;
+ int after_flush(THD *thd, const char *log_file,
+ my_off_t log_pos, bool synced);
+};
+
+#ifdef HAVE_REPLICATION
+class Binlog_transmit_delegate
+ :public Delegate {
+public:
+ typedef Binlog_transmit_observer Observer;
+ int transmit_start(THD *thd, ushort flags,
+ const char *log_file, my_off_t log_pos);
+ int transmit_stop(THD *thd, ushort flags);
+ int reserve_header(THD *thd, ushort flags, String *packet);
+ int before_send_event(THD *thd, ushort flags,
+ String *packet, const
+ char *log_file, my_off_t log_pos );
+ int after_send_event(THD *thd, ushort flags,
+ String *packet);
+ int after_reset_master(THD *thd, ushort flags);
+};
+
+class Binlog_relay_IO_delegate
+ :public Delegate {
+public:
+ typedef Binlog_relay_IO_observer Observer;
+ int thread_start(THD *thd, Master_info *mi);
+ int thread_stop(THD *thd, Master_info *mi);
+ int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
+ int after_read_event(THD *thd, Master_info *mi,
+ const char *packet, ulong len,
+ const char **event_buf, ulong *event_len);
+ int after_queue_event(THD *thd, Master_info *mi,
+ const char *event_buf, ulong event_len,
+ bool synced);
+ int after_reset_slave(THD *thd, Master_info *mi);
+private:
+ void init_param(Binlog_relay_IO_param *param, Master_info *mi);
+};
+#endif /* HAVE_REPLICATION */
+
+int delegates_init();
+void delegates_destroy();
+
+extern Trans_delegate *transaction_delegate;
+extern Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+extern Binlog_transmit_delegate *binlog_transmit_delegate;
+extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+ if there is no observers in the delegate, we can return 0
+ immediately.
+*/
+#define RUN_HOOK(group, hook, args) \
+ (group ##_delegate->is_empty() ? \
+ 0 : group ##_delegate->hook args)
+
+#endif /* RPL_HANDLER_H */
=== modified file 'sql/slave.cc'
--- a/sql/slave.cc 2008-08-08 01:39:23 +0000
+++ b/sql/slave.cc 2008-09-26 14:05:38 +0000
@@ -40,6 +40,7 @@
#include <sql_common.h>
#include <errmsg.h>
#include <mysys_err.h>
+#include "rpl_handler.h"
#ifdef HAVE_REPLICATION
@@ -68,6 +69,8 @@ ulonglong relay_log_space_limit = 0;
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
int events_till_abort = -1;
+static pthread_key(Master_info*, RPL_MASTER_INFO);
+
enum enum_slave_reconnect_actions
{
SLAVE_RECON_ACT_REG= 0,
@@ -226,6 +229,10 @@ int init_slave()
TODO: re-write this to interate through the list of files
for multi-master
*/
+
+ if (pthread_key_create(&RPL_MASTER_INFO, NULL))
+ goto err;
+
active_mi= new Master_info;
/*
@@ -1500,17 +1507,22 @@ static int safe_sleep(THD* thd, int sec,
}
-static int request_dump(MYSQL* mysql, Master_info* mi,
- bool *suppress_warnings)
+static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
+ bool *suppress_warnings)
{
uchar buf[FN_REFLEN + 10];
int len;
- int binlog_flags = 0; // for now
+ ushort binlog_flags = 0; // for now
char* logname = mi->master_log_name;
DBUG_ENTER("request_dump");
*suppress_warnings= FALSE;
+ if (RUN_HOOK(binlog_relay_io,
+ before_request_transmit,
+ (thd, mi, binlog_flags)))
+ DBUG_RETURN(1);
+
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
@@ -2135,6 +2147,12 @@ pthread_handler_t handle_slave_io(void *
mi->master_log_name,
llstr(mi->master_log_pos,llbuff)));
+ /* This must be called before run any binlog_relay_io hooks */
+ my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
+
+ if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+ goto err;
+
if (!(mi->mysql = mysql = mysql_init(NULL)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
@@ -2209,7 +2227,7 @@ connected:
while (!io_slave_killed(thd,mi))
{
thd_proc_info(thd, "Requesting binlog dump");
- if (request_dump(mysql, mi, &suppress_warnings))
+ if (request_dump(thd, mysql, mi, &suppress_warnings))
{
sql_print_error("Failed on request_dump()");
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
@@ -2229,6 +2247,7 @@ requesting master dump") ||
goto err;
goto connected;
});
+ const char *event_buf;
while (!io_slave_killed(thd,mi))
{
@@ -2284,10 +2303,27 @@ Stopping slave I/O thread due to out-of-
retry_count=0; // ok event, reset retry counter
thd_proc_info(thd, "Queueing master event to the relay log");
- if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_len))
+ event_buf= (const char*)mysql->net.read_pos + 1;
+ if (RUN_HOOK(binlog_relay_io, after_read_event,
+ (thd, mi,(const char*)mysql->net.read_pos + 1,
+ event_len, &event_buf, &event_len)))
+ {
+ sql_print_error("Failed to run 'after_read_event' hook");
+ goto err;
+ }
+
+ /* XXX: 'synced' should be updated by queue_event to indicate
+ whether event has been synced to disk */
+ bool synced= 0;
+ if (queue_event(mi, event_buf, event_len))
{
goto err;
}
+
+ if (RUN_HOOK(binlog_relay_io, after_queue_event,
+ (thd, mi, event_buf, event_len, synced)))
+ goto err;
+
if (flush_master_info(mi, 1))
{
sql_print_error("Failed to flush master info file");
@@ -2334,6 +2370,7 @@ err:
sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
pthread_mutex_lock(&LOCK_thread_count);
+ RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
thd->query = thd->db = 0; // extra safety
thd->query_length= thd->db_length= 0;
pthread_mutex_unlock(&LOCK_thread_count);
@@ -3454,6 +3491,64 @@ static int safe_reconnect(THD* thd, MYSQ
}
+MYSQL *rpl_connect_master(MYSQL *mysql)
+{
+ THD *thd= current_thd;
+ Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
+ if (!mi)
+ {
+ sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
+ return NULL;
+ }
+
+ bool allocated= false;
+
+ if (!mysql)
+ {
+ if(!(mysql= mysql_init(NULL)))
+ return NULL;
+ allocated= true;
+ }
+
+ /*
+ XXX: copied from connect_to_master, this function should not
+ change the slave status, so we cannot use connect_to_master
+ directly
+
+ TODO: make this part a seperate function to eliminate duplication
+ */
+ mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
+ mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
+
+#ifdef HAVE_OPENSSL
+ if (mi->ssl)
+ {
+ mysql_ssl_set(mysql,
+ mi->ssl_key[0]?mi->ssl_key:0,
+ mi->ssl_cert[0]?mi->ssl_cert:0,
+ mi->ssl_ca[0]?mi->ssl_ca:0,
+ mi->ssl_capath[0]?mi->ssl_capath:0,
+ mi->ssl_cipher[0]?mi->ssl_cipher:0);
+ mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
+ &mi->ssl_verify_server_cert);
+ }
+#endif
+
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ /* This one is not strictly needed but we have it here for completeness */
+ mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
+
+ if (io_slave_killed(thd, mi)
+ || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
+ mi->port, 0, 0))
+ {
+ if (allocated)
+ mysql_close(mysql); // this will free the object
+ return NULL;
+ }
+ return mysql;
+}
+
/*
Store the file and position where the execute-slave thread are in the
relay log.
=== modified file 'sql/sql_insert.cc'
--- a/sql/sql_insert.cc 2008-10-01 12:02:28 +0000
+++ b/sql/sql_insert.cc 2008-10-07 17:04:28 +0000
@@ -3806,10 +3806,10 @@ void select_create::abort()
DBUG_ENTER("select_create::abort");
/*
- In select_insert::abort() we roll back the statement, including
- truncating the transaction cache of the binary log. To do this, we
- pretend that the statement is transactional, even though it might
- be the case that it was not.
+ We roll back the statement here, including truncating the
+ transaction cache of the binary log. To do this, we pretend that
+ the statement is transactional, even though it might be the case
+ that it was not.
We roll back the statement prior to deleting the table and prior
to releasing the lock on the table, since there might be potential
@@ -3823,6 +3823,7 @@ void select_create::abort()
tmp_disable_binlog(thd);
select_insert::abort();
thd->transaction.stmt.modified_non_trans_table= FALSE;
+ trans_rollback_stmt(thd);
reenable_binlog(thd);
thd->binlog_flush_pending_rows_event(TRUE);
=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc 2008-10-02 14:03:57 +0000
+++ b/sql/sql_parse.cc 2008-10-07 17:04:28 +0000
@@ -21,6 +21,7 @@
#include <m_ctype.h>
#include <myisam.h>
#include <my_dir.h>
+#include "rpl_handler.h"
#include "sp_head.h"
#include "sp.h"
=== modified file 'sql/sql_plugin.cc'
--- a/sql/sql_plugin.cc 2008-08-19 15:58:48 +0000
+++ b/sql/sql_plugin.cc 2008-09-23 14:33:18 +0000
@@ -20,14 +20,6 @@
#define REPORT_TO_LOG 1
#define REPORT_TO_USER 2
-#ifdef DBUG_OFF
-#define plugin_ref_to_int(A) A
-#define plugin_int_to_ref(A) A
-#else
-#define plugin_ref_to_int(A) (A ? A[0] : NULL)
-#define plugin_int_to_ref(A) &(A)
-#endif
-
extern struct st_mysql_plugin *mysqld_builtins[];
char *opt_plugin_load= NULL;
@@ -44,7 +36,8 @@ const LEX_STRING plugin_type_names[MYSQL
{ C_STRING_WITH_LEN("FTPARSER") },
{ C_STRING_WITH_LEN("DAEMON") },
{ C_STRING_WITH_LEN("INFORMATION SCHEMA") },
- { C_STRING_WITH_LEN("AUDIT") }
+ { C_STRING_WITH_LEN("AUDIT") },
+ { C_STRING_WITH_LEN("REPLICATION") },
};
extern int initialize_schema_table(st_plugin_int *plugin);
@@ -89,7 +82,8 @@ static int min_plugin_info_interface_ver
MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION,
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
- MYSQL_AUDIT_INTERFACE_VERSION
+ MYSQL_AUDIT_INTERFACE_VERSION,
+ MYSQL_REPLICATION_INTERFACE_VERSION
};
static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{
@@ -98,7 +92,8 @@ static int cur_plugin_info_interface_ver
MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION,
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
- MYSQL_AUDIT_INTERFACE_VERSION
+ MYSQL_AUDIT_INTERFACE_VERSION,
+ MYSQL_REPLICATION_INTERFACE_VERSION
};
static bool initialized= 0;
=== modified file 'sql/sql_plugin.h'
--- a/sql/sql_plugin.h 2008-06-27 20:56:54 +0000
+++ b/sql/sql_plugin.h 2008-09-23 14:33:18 +0000
@@ -18,6 +18,14 @@
class sys_var;
+#ifdef DBUG_OFF
+#define plugin_ref_to_int(A) A
+#define plugin_int_to_ref(A) A
+#else
+#define plugin_ref_to_int(A) (A ? A[0] : NULL)
+#define plugin_int_to_ref(A) &(A)
+#endif
+
/*
the following flags are valid for plugin_init()
*/
=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc 2008-09-04 18:30:34 +0000
+++ b/sql/sql_repl.cc 2008-09-23 14:33:18 +0000
@@ -21,6 +21,7 @@
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
+#include "rpl_handler.h"
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -378,11 +379,36 @@ static int send_heartbeat_event(NET* net
{
DBUG_RETURN(-1);
}
- packet->set("\0", 1, &my_charset_bin);
DBUG_RETURN(0);
}
/*
+ Reset thread transmit packet buffer for event sending
+
+ This function allocates header bytes for event transmission, and
+ should be called before store the event data to the packet buffer.
+*/
+static int reset_transmit_packet(THD *thd, ushort flags,
+ ulong *ev_offset, const char **errmsg)
+{
+ int ret= 0;
+ String *packet= &thd->packet;
+
+ /* reserve and set default header */
+ packet->length(0);
+ packet->set("\0", 1, &my_charset_bin);
+
+ if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
+ {
+ *errmsg= "Failed to run hook 'reserve_header'";
+ my_errno= ER_UNKNOWN_ERROR;
+ ret= 1;
+ }
+ *ev_offset= packet->length();
+ return ret;
+}
+
+/*
TODO: Clean up loop to only have one call to send_file()
*/
@@ -392,6 +418,9 @@ void mysql_binlog_send(THD* thd, char* l
LOG_INFO linfo;
char *log_file_name = linfo.log_file_name;
char search_file_name[FN_REFLEN], *name;
+
+ ulong ev_offset;
+
IO_CACHE log;
File file = -1;
String* packet = &thd->packet;
@@ -407,6 +436,9 @@ void mysql_binlog_send(THD* thd, char* l
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
+ sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
+ thd->server_id, log_ident, (ulong)pos);
+
/*
heartbeat_period from @master_heartbeat_period user variable
*/
@@ -423,6 +455,13 @@ void mysql_binlog_send(THD* thd, char* l
coord->file_name= log_file_name; // initialization basing on what slave remembers
coord->pos= pos;
}
+ if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+ {
+ errmsg= "Failed to run hook 'transmit_start'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
{
@@ -477,11 +516,9 @@ impossible position";
goto err;
}
- /*
- We need to start a packet with something other than 255
- to distinguish it from error
- */
- packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
+ /* reset transmit packet for the fake rotate event below */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
/*
Tell the client about the log name with a fake Rotate event;
@@ -521,7 +558,7 @@ impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
- packet->set("\0", 1, &my_charset_bin);
+
/*
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
this larger than the corresponding packet (query) sent
@@ -537,6 +574,11 @@ impossible position";
log_lock = mysql_bin_log.get_log_lock();
if (pos > BIN_LOG_HEADER_SIZE)
{
+ /* reset transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Try to find a Format_description_log_event at the beginning of
the binlog
@@ -544,29 +586,30 @@ impossible position";
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
/*
- The packet has offsets equal to the normal offsets in a binlog
- event +1 (the first character is \0).
+ The packet has offsets equal to the normal offsets in a
+ binlog event + ev_offset (the first ev_offset characters are
+ the header (default \0)).
*/
DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %d",
- (*packet)[EVENT_TYPE_OFFSET+1]));
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
+ if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+ binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
should not increment master's binlog position
(rli->group_master_log_pos)
*/
- int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
+ int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
/*
if reconnect master sends FD event with `created' as 0
to avoid destroying temp tables.
*/
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
- ST_CREATED_OFFSET+1, (ulong) 0);
+ ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* send it */
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
@@ -592,8 +635,6 @@ impossible position";
Format_description_log_event will be found naturally if it is written.
*/
}
- /* reset the packet as we wrote to it in any case */
- packet->set("\0", 1, &my_charset_bin);
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
else
{
@@ -605,6 +646,12 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed)
{
+ Log_event_type event_type= UNKNOWN_EVENT;
+
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
#ifndef DBUG_OFF
@@ -620,17 +667,26 @@ impossible position";
log's filename does not change while it's active
*/
if (coord)
- coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
+ coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
+ if (event_type == FORMAT_DESCRIPTION_EVENT)
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+ binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
- else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+ else if (event_type == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
+ }
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
@@ -638,9 +694,8 @@ impossible position";
goto err;
}
- DBUG_PRINT("info", ("log event code %d",
- (*packet)[LOG_EVENT_OFFSET+1] ));
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ DBUG_PRINT("info", ("log event code %d", event_type));
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -649,7 +704,17 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ errmsg= "Failed to run hook 'after_send_event'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+
+ /* reset transmit packet for next loop */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
}
/*
@@ -700,6 +765,11 @@ impossible position";
}
#endif
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
No one will update the log while we are reading
now, but we'll be quick and just read one record
@@ -718,6 +788,7 @@ impossible position";
read_packet = 1;
if (coord)
coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
+ event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
break;
case LOG_READ_EOF:
@@ -753,6 +824,9 @@ impossible position";
sql_print_information("the rest of heartbeat info skipped ...");
}
#endif
+ /* reset transmit packet for the heartbeat event */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
if (send_heartbeat_event(net, packet, coord))
{
errmsg = "Failed on my_net_write()";
@@ -778,8 +852,17 @@ impossible position";
}
if (read_packet)
- {
- thd_proc_info(thd, "Sending binlog event to slave");
+ {
+ thd_proc_info(thd, "Sending binlog event to slave");
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
@@ -787,7 +870,7 @@ impossible position";
goto err;
}
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -796,11 +879,13 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
- /*
- No need to net_flush because we will get to flush later when
- we hit EOF pretty quick
- */
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "Failed to run hook 'after_send_event'";
+ goto err;
+ }
}
if (fatal_error)
@@ -836,6 +921,10 @@ impossible position";
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
+ /* reset transmit packet for the possible fake rotate event */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Call fake_rotate_event() in case the previous log (the one which
we have just finished reading) did not contain a Rotate event
@@ -853,8 +942,6 @@ impossible position";
goto err;
}
- packet->length(0);
- packet->append('\0');
if (coord)
coord->file_name= log_file_name; // reset to the next
}
@@ -864,6 +951,7 @@ end:
end_io_cache(&log);
(void)my_close(file, MYF(MY_WME));
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
my_eof(thd);
thd_proc_info(thd, "Waiting to finalize termination");
pthread_mutex_lock(&LOCK_thread_count);
@@ -874,6 +962,7 @@ end:
err:
thd_proc_info(thd, "Waiting to finalize termination");
end_io_cache(&log);
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
/*
Exclude iteration through thread list
this is needed for purge_logs() - it will iterate through
@@ -1134,6 +1223,7 @@ int reset_slave(THD *thd, Master_info* m
goto err;
}
+ RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err:
unlock_slave_threads(mi);
if (error)
@@ -1416,7 +1506,11 @@ int reset_master(THD* thd)
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
return 1;
}
- return mysql_bin_log.reset_logs(thd);
+
+ if (mysql_bin_log.reset_logs(thd))
+ return 1;
+ RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
+ return 0;
}
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
@@ -1926,5 +2020,3 @@ int init_replication_sys_vars()
}
#endif /* HAVE_REPLICATION */
-
-
=== modified file 'sql/transaction.cc'
--- a/sql/transaction.cc 2008-08-12 22:30:55 +0000
+++ b/sql/transaction.cc 2008-09-23 14:33:18 +0000
@@ -20,6 +20,7 @@
#include "transaction.h"
#include "mysql_priv.h"
+#include "rpl_handler.h"
#ifdef WITH_MARIA_STORAGE_ENGINE
#include "../storage/maria/ha_maria.h"
@@ -97,6 +98,14 @@ bool trans_commit(THD *thd)
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
res= ha_commit_trans(thd, TRUE);
+ if (res)
+ /*
+ if res is non-zero, then ha_commit_trans has rolled back the
+ transaction, so the hooks for rollback will be called.
+ */
+ RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+ else
+ RUN_HOOK(transaction, after_commit, (thd, FALSE));
thd->options&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.modified_non_trans_table= FALSE;
thd->lex->start_transaction_opt= 0;
@@ -163,6 +172,7 @@ bool trans_rollback(THD *thd)
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
res= ha_rollback_trans(thd, TRUE);
+ RUN_HOOK(transaction, after_rollback, (thd, FALSE));
thd->options&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.modified_non_trans_table= FALSE;
thd->lex->start_transaction_opt= 0;
@@ -192,6 +202,15 @@ bool trans_commit_stmt(THD *thd)
int res= FALSE;
if (thd->transaction.stmt.ha_list)
res= ha_commit_trans(thd, FALSE);
+
+ if (res)
+ /*
+ if res is non-zero, then ha_commit_trans has rolled back the
+ transaction, so the hooks for rollback will be called.
+ */
+ RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+ else
+ RUN_HOOK(transaction, after_commit, (thd, FALSE));
DBUG_RETURN(test(res));
}
@@ -216,6 +235,8 @@ bool trans_rollback_stmt(THD *thd)
ha_rollback_trans(thd, TRUE);
}
+ RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+
DBUG_RETURN(FALSE);
}
=== modified file 'storage/falcon/Cache.cpp'
--- a/storage/falcon/Cache.cpp 2008-09-27 18:04:19 +0000
+++ b/storage/falcon/Cache.cpp 2008-10-02 23:06:04 +0000
@@ -50,11 +50,9 @@
extern uint falcon_io_threads;
//#define STOP_PAGE 55
-//#define CACHE_TRACE_FILE "cache.trace"
+#define TRACE_FILE "cache.trace"
-#ifdef CACHE_TRACE_FILE
static FILE *traceFile;
-#endif // CACHE_TRACE_FILE
static const uint64 cacheHunkSize = 1024 * 1024 * 128;
static const int ASYNC_BUFFER_SIZE = 1024000;
@@ -70,50 +68,21 @@ static const char THIS_FILE[]=__FILE__;
Cache::Cache(Database *db, int pageSz, int hashSz, int numBuffers)
{
- openTraceFile();
+ //openTraceFile();
database = db;
panicShutdown = false;
pageSize = pageSz;
-
- unsigned int highBit;
- for (highBit=0x01; highBit < (unsigned int)hashSz; highBit= highBit << 1) { }
-
- // if there are more than 4096 buckets then lets round down
- // else lets round up
- if (highBit >= 0x00001000)
- {
- // use power of two rounded down
- hashSize = highBit << 1;
- }
- else
- {
- // use power of two rounded up
- hashSize = highBit;
- }
-
- hashMask = hashSize - 1;
+ hashSize = hashSz;
numberBuffers = numBuffers;
upperFraction = numberBuffers / 4;
bufferAge = 0;
firstDirty = NULL;
lastDirty = NULL;
+ numberDirtyPages = 0;
pageWriter = NULL;
- hashTable = new Bdb* [hashSize];
+ hashTable = new Bdb* [hashSz];
memset (hashTable, 0, sizeof (Bdb*) * hashSize);
-#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
- syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
- for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
- syncHashTable[loop].setName("Cache::syncHashTable");
-
-#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
- syncHashTable = new SyncObject [hashSize];
- for (int loop = 0; loop < hashSize; loop ++)
- {
- char tmpName[128];
- snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
- syncHashTable[loop].setName(tmpName);
- }
-#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
+
if (falcon_use_sectorcache)
sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE, pageSize);
@@ -133,10 +102,9 @@ Cache::Cache(Database *db, int pageSz, i
ioThreads = new Thread*[numberIoThreads];
memset(ioThreads, 0, numberIoThreads * sizeof(ioThreads[0]));
flushing = false;
-
+
try
{
- // non-protected access to bdbs,endBdbs is OK during initialization
bdbs = new Bdb [numberBuffers];
endBdbs = bdbs + numberBuffers;
int remaining = 0;
@@ -155,7 +123,6 @@ Cache::Cache(Database *db, int pageSz, i
}
bdb->cache = this;
- // non-protected access to bufferQueue is OK during initialization
bufferQueue.append(bdb);
bdb->buffer = (Page*) stuff;
stuff += pageSize;
@@ -181,17 +148,17 @@ Cache::Cache(Database *db, int pageSz, i
Cache::~Cache()
{
-
- closeTraceFile();
+ if (traceFile)
+ closeTraceFile();
delete [] hashTable;
- delete [] syncHashTable;
delete [] bdbs;
delete [] ioThreads;
delete flushBitmap;
+
if (falcon_use_sectorcache)
delete sectorCache;
-
+
if (bufferHunks)
{
for (int n = 0; n < numberHunks; ++n)
@@ -204,16 +171,19 @@ Cache::~Cache()
Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
{
ASSERT (pageNumber >= 0);
- Bdb *bdb;
-
- /* If we already have a buffer for this, we're done */
- bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
+ Sync sync (&syncObject, "Cache::probePage");
+ sync.lock (Shared);
+ Bdb *bdb = findBdb(dbb, pageNumber);
+
if (bdb)
{
+ bdb->incrementUseCount(ADD_HISTORY);
+ sync.unlock();
+
if (bdb->buffer->pageType == PAGE_free)
{
bdb->decrementUseCount(REL_HISTORY);
-
+
return NULL;
}
@@ -226,49 +196,11 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
return NULL;
}
-Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
-{
- for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- {
- if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
- {
- return bdb;
- }
- }
-
- return NULL;
-}
-
Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
{
- return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
-}
-
-Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
-{
- int slot = PAGENUM_2_SLOT(pageNumber);
- Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
- lockHash.lock (Shared);
- Bdb *bdb;
-
- bdb = findBdb(dbb, pageNumber, slot);
- if (bdb != NULL)
- bdb->incrementUseCount(ADD_HISTORY);
-
- return bdb;
-}
-
-Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
-{
- Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
- lockHash.lock (Shared);
-
- for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
- if (bdb->pageNumber == pageNumber)
- {
- bdb->incrementUseCount(ADD_HISTORY);
+ for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
return bdb;
- }
return NULL;
}
@@ -278,106 +210,95 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
if (panicShutdown)
{
Thread *thread = Thread::getThread("Cache::fetchPage");
-
+
if (thread->pageMarks == 0)
throw SQLError(RUNTIME_ERROR, "Emergency shut is underway");
}
-#ifdef STOP_PAGE
+#ifdef STOP_PAGE
if (pageNumber == STOP_PAGE)
Log::debug("fetching page %d/%d\n", pageNumber, dbb->tableSpaceId);
#endif
ASSERT (pageNumber >= 0);
+ int slot = pageNumber % hashSize;
+ LockType actual = lockType;
+ Sync sync (&syncObject, "Cache::fetchPage");
+ sync.lock (Shared);
+ int hit = 0;
+
+ /* If we already have a buffer for this go, we're done */
+
Bdb *bdb;
- /* If we already have a buffer for this, we're done */
- bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
+ for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ {
+ //syncObject.validateShared("Cache::fetchPage");
+ bdb->incrementUseCount(ADD_HISTORY);
+ sync.unlock();
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ hit = 1;
+ break;
+ }
+
if (!bdb)
{
- // getFreeBuffer() locks a hash bucket to remove the candidate bdb
- // if we locked our hash bucket before the call then we could have
- // a deadlock
- // thus we get the free buffer before we lock the hash bucket we will
- // be inserting into. This avoids a dead lock but generates a race
- // we take care of the race by reversing the getFreeBuffer() work
- // when we lose the race
- Bdb *bdbAvailable;
- int slot = PAGENUM_2_SLOT(pageNumber);
- Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fetchPage");
-
- bdbAvailable = getFreeBuffer();
- /* assume we'll be inserting this new BDB. Set new page number. */
- bdbAvailable->pageNumber = pageNumber;
- bdbAvailable->dbb = dbb;
+ sync.unlock();
+ actual = Exclusive;
+ sync.lock(Exclusive);
+
+ for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ {
+ //syncObject.validateExclusive("Cache::fetchPage (retry)");
+ bdb->incrementUseCount(ADD_HISTORY);
+ sync.unlock();
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ hit = 2;
+ break;
+ }
- lockHash.lock(Exclusive);
- bdb = findBdb(dbb, pageNumber, slot);
if (!bdb)
{
- // we won the race so lets use the free bdb
- // relink into hash table
- bdbAvailable->hash = hashTable [slot];
- hashTable [slot] = bdbAvailable;
- lockHash.unlock();
+ bdb = findBuffer(dbb, pageNumber, actual);
+ moveToHead(bdb);
+ sync.unlock();
- bdb = bdbAvailable;
-#ifdef STOP_PAGE
+#ifdef STOP_PAGE
if (bdb->pageNumber == STOP_PAGE)
Log::debug("reading page %d/%d\n", bdb->pageNumber, dbb->tableSpaceId);
#endif
-
+
Priority priority(database->ioScheduler);
- priority.schedule(PRIORITY_MEDIUM);
+ priority.schedule(PRIORITY_MEDIUM);
if (falcon_use_sectorcache)
sectorCache->readPage(bdb);
else
dbb->readPage(bdb);
-
priority.finished();
#ifdef HAVE_PAGE_NUMBER
ASSERT(bdb->buffer->pageNumber == pageNumber);
-#endif
- if (Exclusive != lockType)
+#endif
+ if (actual != lockType)
bdb->downGrade(lockType);
-
- }
- else
- {
- //syncObject.validateExclusive("Cache::fetchPage (retry)");
- bdb->incrementUseCount(ADD_HISTORY);
- lockHash.unlock();
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- moveToHead(bdb);
-
- // lost a race. put our available back to useable
- // side effect, bdbAvailable will have to age again before we re-use it.
- bdbAvailable->hash = NULL;
- bdbAvailable->pageNumber = -1;
- bdbAvailable->dbb = NULL;
- bdbAvailable->release(REL_HISTORY);
}
}
- else
- {
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- moveToHead(bdb);
- }
Page *page = bdb->buffer;
-
+
/***
if (page->checksum != (short) pageNumber)
FATAL ("page %d wrong page number, got %d\n",
bdb->pageNumber, page->checksum);
***/
-
+
if (pageType && page->pageType != pageType)
{
/*** future code
- bdb->release(REL_HISTORY);
+ bdb->release();
throw SQLError (DATABASE_CORRUPTION, "page %d wrong page type, expected %d got %d\n",
pageNumber, pageType, page->pageType);
***/
@@ -385,6 +306,14 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
bdb->pageNumber, dbb->tableSpaceId, pageType, page->pageType);
}
+ // If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
+
+ if (bdb->age < bufferAge - (uint64) upperFraction)
+ {
+ sync.lock (Exclusive);
+ moveToHead (bdb);
+ }
+
ASSERT (bdb->pageNumber == pageNumber);
ASSERT (bdb->dbb == dbb);
ASSERT (bdb->useCount > 0);
@@ -394,74 +323,43 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId transId)
{
- Bdb *bdb;
+ Sync sync(&syncObject, "Cache::fakePage");
+ sync.lock(Exclusive);
+ int slot = pageNumber % hashSize;
-#ifdef STOP_PAGE
+#ifdef STOP_PAGE
if (pageNumber == STOP_PAGE)
Log::debug("faking page %d/%d\n",pageNumber, dbb->tableSpaceId);
#endif
/* If we already have a buffer for this, we're done */
- bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
- if (!bdb)
- {
- // getFreeBuffer() locks a hash bucket to remove the candidate bdb
- // if we locked our hash bucket before the call then we could have
- // a deadlock
- // thus we get the free buffer before we lock the hash bucket we will
- // be inserting into. This avoids a dead lock but generates a race
- // we take care of the race by reversing the getFreeBuffer() work
- // when we lose the race
- Bdb *bdbAvailable;
- int slot = PAGENUM_2_SLOT(pageNumber);
- Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fetchPage");
-
- bdbAvailable = getFreeBuffer();
- /* assume we'll be inserting this new BDB. Set new page number. */
- bdbAvailable->pageNumber = pageNumber;
- bdbAvailable->dbb = dbb;
- lockHash.lock(Exclusive);
- bdb = findBdb(dbb, pageNumber, slot);
- if (!bdb)
- {
- // we won the race so lets use the free bdb
- // relink into hash table
- bdbAvailable->hash = hashTable [slot];
- hashTable [slot] = bdbAvailable;
- lockHash.unlock();
+ Bdb *bdb;
- bdb = bdbAvailable;
- }
- else
+ for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
{
- //syncObject.validateExclusive("Cache::fetchPage (retry)");
- bdb->incrementUseCount(ADD_HISTORY);
- lockHash.unlock();
+ if (bdb->syncObject.isLocked())
+ {
+ // The pageWriter may still be cleaning up this freed page with a shared lock
+ ASSERT(bdb->buffer->pageType == PAGE_free);
+ ASSERT(bdb->syncObject.getState() >= 0);
+ }
+
bdb->addRef(Exclusive COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- moveToHead(bdb);
-
- // lost a race. put our available back to useable
- // side effect, bdbAvailable will have to age again before we re-use it.
- bdbAvailable->hash = NULL;
- bdbAvailable->pageNumber = -1;
- bdbAvailable->dbb = NULL;
- bdbAvailable->release(REL_HISTORY);
+
+ break;
}
- }
- else
- {
- bdb->addRef(Exclusive COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- moveToHead(bdb);
- }
+
+ if (!bdb)
+ bdb = findBuffer(dbb, pageNumber, Exclusive);
if (!dbb->isReadOnly)
bdb->mark(transId);
-
+
memset(bdb->buffer, 0, pageSize);
bdb->setPageHeader(type);
+ moveToHead(bdb);
return bdb;
}
@@ -469,19 +367,19 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
void Cache::flush(int64 arg)
{
Sync flushLock(&syncFlush, "Cache::flush(1)");
- Sync dirtyLock(&syncDirty, "Cache::flush(2)");
+ Sync sync(&syncDirty, "Cache::flush(2)");
flushLock.lock(Exclusive);
-
+
if (flushing)
return;
syncWait.lock(NULL, Exclusive);
+ sync.lock(Shared);
//Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
flushArg = arg;
flushPages = 0;
physicalWrites = 0;
-
- dirtyLock.lock(Shared);
+
for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
{
bdb->flushIt = true;
@@ -489,14 +387,14 @@ void Cache::flush(int64 arg)
++flushPages;
}
- dirtyLock.unlock();
-
- analyzeFlush();
+ if (traceFile)
+ analyzeFlush();
flushStart = database->timestamp;
flushing = true;
+ sync.unlock();
flushLock.unlock();
-
+
for (int n = 0; n < numberIoThreads; ++n)
if (ioThreads[n])
ioThreads[n]->wake();
@@ -504,135 +402,69 @@ void Cache::flush(int64 arg)
void Cache::moveToHead(Bdb * bdb)
{
- // If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
- // non-protected access to age is harmless since it is fuzzy anyway
- if (bdb->age < bufferAge - (uint64) upperFraction)
- {
- Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
-
- bufferQueueLock.lock (Exclusive);
- bdb->age = bufferAge++;
- bufferQueue.remove(bdb);
- bufferQueue.prepend(bdb);
- //validateUnique (bdb);
- }
-}
-
-void Cache::moveToHeadAlreadyLocked(Bdb * bdb)
-{
bdb->age = bufferAge++;
bufferQueue.remove(bdb);
bufferQueue.prepend(bdb);
//validateUnique (bdb);
}
-Bdb* Cache::getFreeBuffer(void)
+Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
{
- Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::getFreeBuffer");
- unsigned int count;
+ //syncObject.validateExclusive("Cache::findBuffer");
+ int slot = pageNumber % hashSize;
+ Sync sync(&syncDirty, "Cache::findBuffer");
+
+ /* Find least recently used, not-in-use buffer */
+
Bdb *bdb;
// Find a candidate BDB.
+
for (;;)
{
- bufferQueueLock.lock (Exclusive);
- // find a candidate that is NOT in use and NOT dirty and in the tail fraction of the LRU
- for (count = 0, bdb = bufferQueue.last; bdb ; bdb = bdb->prior, count++)
- {
- if (count >= upperFraction)
- {
- bdb = NULL;
- break;
- }
-
+ for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
if (bdb->useCount == 0)
- {
- if (!bdb->isDirty)
- {
- bdb->incrementUseCount(REL_HISTORY);
- moveToHeadAlreadyLocked(bdb);
- break;
- }
- }
- else
- {
- // get this one out of the way so we don't search it every time
- moveToHeadAlreadyLocked(bdb);
-#ifdef CHECK_STALLED_BDB
- bdb->stallCount++;
- if ((bdb->stallCount & 0x03) == 0x03)
- {
- Log::debug("Page %d is in use and aged %d times\n",
- bdb->pageNumber, bdb->stallCount);
- }
-#endif // CHECK_STALLED_BDB
- }
- }
- if (!bdb)
- // find a candidate that is NOT in use, could be dirty
- for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
- if (bdb->useCount == 0)
- {
- bdb->incrementUseCount(REL_HISTORY);
- moveToHeadAlreadyLocked(bdb);
- break;
- }
-
- bufferQueueLock.unlock();
+ break;
if (!bdb)
throw SQLError(RUNTIME_ERROR, "buffer pool is exhausted\n");
+
+ if (!bdb->isDirty)
+ break;
+
+ writePage (bdb, WRITE_TYPE_REUSE);
+ }
- if (bdb->pageNumber >= 0)
- {
- int slotRemove = PAGENUM_2_SLOT(bdb->pageNumber);
- Sync lockHashRemove (&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber, slotRemove)], "Cache::getFreeBuffer");
- lockHashRemove.lock(Exclusive);
+ /* Unlink its old incarnation from the page/hash table */
- if (bdb->useCount != 1)
+ if (bdb->pageNumber >= 0)
+ for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;; ptr = &(*ptr)->hash)
+ if (*ptr == bdb)
{
- // we lost a race try again
- bdb->decrementUseCount(REL_HISTORY);
- lockHashRemove.unlock();
- continue;
+ *ptr = bdb->hash;
+ break;
}
+ else
+ ASSERT (*ptr);
- if (bdb->isDirty)
- writePage (bdb, WRITE_TYPE_REUSE);
+ bdb->addRef (lockType COMMA_ADD_HISTORY);
- /* Unlink its old incarnation from the page/hash table */
- for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb->pageNumber) ;; ptr = &(*ptr)->hash)
- if (*ptr == bdb)
- {
- *ptr = bdb->hash;
- break;
- }
- else
- ASSERT (*ptr);
+ /* Set new page number and relink into hash table */
- }
-
- break;
- }
-#ifdef CHECK_STALLED_BDB
- bdb->stallCount = 0;
-#endif // CHECK_STALLED_BDB
+ bdb->hash = hashTable [slot];
+ hashTable [slot] = bdb;
+ bdb->pageNumber = pageNumber;
+ bdb->dbb = dbb;
#ifdef COLLECT_BDB_HISTORY
bdb->initHistory();
#endif
- bdb->addRef (Exclusive COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
return bdb;
}
void Cache::validate()
{
- //Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::validate");
-
- //bufferQueueLock.lock (Shared);
- // non-protected access to bufferQueue is DANGEROUS...
for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
{
//IndexPage *page = (IndexPage*) bdb->buffer;
@@ -642,8 +474,8 @@ void Cache::validate()
void Cache::markDirty(Bdb *bdb)
{
- Sync dirtyLock (&syncDirty, "Cache::markDirty");
- dirtyLock.lock (Exclusive);
+ Sync sync (&syncDirty, "Cache::markDirty");
+ sync.lock (Exclusive);
bdb->nextDirty = NULL;
bdb->priorDirty = lastDirty;
@@ -653,21 +485,23 @@ void Cache::markDirty(Bdb *bdb)
firstDirty = bdb;
lastDirty = bdb;
+ ++numberDirtyPages;
//validateUnique (bdb);
}
void Cache::markClean(Bdb *bdb)
{
- Sync dirtyLock (&syncDirty, "Cache::markClean");
- dirtyLock.lock (Exclusive);
+ Sync sync (&syncDirty, "Cache::markClean");
+ sync.lock (Exclusive);
/***
if (bdb->flushIt)
Log::debug(" Cleaning page %d in %s marked for flush\n", bdb->pageNumber, (const char*) bdb->dbb->fileName);
***/
-
+
bdb->flushIt = false;
-
+ --numberDirtyPages;
+
if (bdb == lastDirty)
lastDirty = bdb->priorDirty;
@@ -709,7 +543,6 @@ void Cache::writePage(Bdb *bdb, int type
{
if (falcon_use_sectorcache)
sectorCache->writePage(bdb);
-
dbb->writePage(bdb, type);
}
catch (SQLException& exception)
@@ -762,7 +595,7 @@ void Cache::writePage(Bdb *bdb, int type
#endif
bdb->isDirty = false;
-
+
if (pageWriter && bdb->isRegistered)
{
bdb->isRegistered = false;
@@ -771,8 +604,8 @@ void Cache::writePage(Bdb *bdb, int type
if (dbb->shadows)
{
- Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
- cloneLock.lock (Shared);
+ Sync sync (&dbb->syncClone, "Cache::writePage(2)");
+ sync.lock (Shared);
for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow = shadow->next)
shadow->rewritePage(bdb);
@@ -781,49 +614,45 @@ void Cache::writePage(Bdb *bdb, int type
void Cache::analyze(Stream *stream)
{
- Sync dirtyLock (&syncDirty, "Cache::analyze");
+ Sync sync (&syncDirty, "Cache::analyze");
+ sync.lock (Shared);
int inUse = 0;
int dirty = 0;
int dirtyList = 0;
int total = 0;
Bdb *bdb;
- // non-protected access to bdbs,endBdbs is DANGEROUS...
for (bdb = bdbs; bdb < endBdbs; ++bdb)
{
++total;
-
+
if (bdb->isDirty)
++dirty;
-
+
if (bdb->useCount)
++inUse;
}
- dirtyLock.lock (Shared);
for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
++dirtyList;
- dirtyLock.unlock();
-
stream->format ("Cache: %d pages, %d in use, %d dirty, %d in dirty chain\n",
total, inUse, dirty, dirtyList);
}
void Cache::validateUnique(Bdb *target)
{
- int slot = PAGENUM_2_SLOT(target->pageNumber);
+ int slot = target->pageNumber % hashSize;
- // WARNING: unlocked walk of hash table.... DANGEROUS
for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
ASSERT (bdb == target || !(bdb->pageNumber == target->pageNumber && bdb->dbb == target->dbb));
}
void Cache::freePage(Dbb *dbb, int32 pageNumber)
{
- int slot = PAGENUM_2_SLOT(pageNumber);
- Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::freePage");
- lockHash.lock(Shared);
+ Sync sync (&syncObject, "Cache::freePage");
+ sync.lock (Shared);
+ int slot = pageNumber % hashSize;
// If page exists in cache (usual case), clean it up
@@ -832,9 +661,10 @@ void Cache::freePage(Dbb *dbb, int32 pag
{
if (bdb->isDirty)
{
+ sync.unlock();
markClean (bdb);
}
-
+
bdb->isDirty = false;
break;
}
@@ -859,14 +689,12 @@ void Cache::flush(Dbb *dbb)
bool Cache::hasDirtyPages(Dbb *dbb)
{
- Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
- dirtyLock.lock (Shared);
+ Sync sync (&syncDirty, "Cache::hasDirtyPages");
+ sync.lock (Shared);
for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
if (bdb->dbb == dbb)
- {
return true;
- }
return false;
}
@@ -887,22 +715,32 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
if (panicShutdown)
{
Thread *thread = Thread::getThread("Cache::trialFetch");
-
+
if (thread->pageMarks == 0)
throw SQLError(RUNTIME_ERROR, "Emergency shut is underway");
}
ASSERT (pageNumber >= 0);
+ int slot = pageNumber % hashSize;
+ Sync sync (&syncObject, "Cache::trialFetch");
+ sync.lock (Shared);
+ int hit = 0;
+
+ /* If we already have a buffer for this go, we're done */
+
Bdb *bdb;
- /* If we already have a buffer for this, we're done */
- bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
- if (bdb)
- {
- bdb->addRef(lockType COMMA_ADD_HISTORY);
- bdb->decrementUseCount(REL_HISTORY);
- moveToHead(bdb);
- }
+ for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+ {
+ //syncObject.validateShared("Cache::trialFetch");
+ bdb->incrementUseCount(ADD_HISTORY);
+ sync.unlock();
+ bdb->addRef(lockType COMMA_ADD_HISTORY);
+ bdb->decrementUseCount(REL_HISTORY);
+ hit = 1;
+ break;
+ }
return bdb;
}
@@ -913,14 +751,13 @@ void Cache::syncFile(Dbb *dbb, const cha
int writes = dbb->writesSinceSync;
time_t start = database->timestamp;
dbb->sync();
-
+
if (Log::isActive(LogInfo))
{
time_t delta = database->timestamp - start;
-
+
if (delta > 1)
- Log::log(LogInfo, "%d: %s %s sync: %d pages in %d seconds\n",
- database->deltaTime, fileName, text, writes, delta);
+ Log::log(LogInfo, "%d: %s %s sync: %d pages in %d seconds\n", database->deltaTime, fileName, text, writes, delta);
}
}
@@ -931,186 +768,186 @@ void Cache::ioThread(void* arg)
void Cache::ioThread(void)
{
- Sync syncThread(&syncThreads, "Cache::ioThread");
+ Sync syncThread(&syncThreads, "Cache::ioThread(1)");
syncThread.lock(Shared);
- Sync flushLock(&syncFlush, "Cache::ioThread");
+ Sync flushLock(&syncFlush, "Cache::ioThread(2)");
+ Sync sync(&syncObject, "Cache::ioThread(3)");
Priority priority(database->ioScheduler);
Thread *thread = Thread::getThread("Cache::ioThread");
UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
UCHAR *buffer = (UCHAR*) (((UIPTR) rawBuffer + pageSize - 1) / pageSize * pageSize);
UCHAR *end = (UCHAR*) ((UIPTR) (rawBuffer + ASYNC_BUFFER_SIZE) / pageSize * pageSize);
flushLock.lock(Exclusive);
-
+
// This is the main loop. Write blocks until there's nothing to do, then sleep
-
+
for (;;)
{
int32 pageNumber = flushBitmap->nextSet(0);
int count;
-
+ Dbb *dbb;
+
if (pageNumber >= 0)
{
- Bdb *bdb;
- Dbb *dbb;
- int slot = PAGENUM_2_SLOT(pageNumber);
+ int slot = pageNumber % hashSize;
bool hit = false;
Bdb *bdbList = NULL;
UCHAR *p = buffer;
-
- // Look for the page to flush.
- bdb = lockFindBdbIncrementUseCount(pageNumber, slot);
- if (bdb && bdb->flushIt && bdb->isDirty)
- {
- hit = true;
- count = 0;
- dbb = bdb->dbb;
-
- flushBitmap->clear(pageNumber);
-
- // get all his friends
- while (p < end)
+ sync.lock(Shared);
+
+ // Look for a page to flush. Then get all his friends
+
+ for (Bdb *bdb = hashTable[slot]; bdb; bdb = bdb->hash)
+ if (bdb->pageNumber == pageNumber && bdb->flushIt && bdb->isDirty)
{
- ++count;
- bdb->addRef(Shared COMMA_ADD_HISTORY);
-
- bdb->syncWrite.lock(NULL, Exclusive);
- bdb->ioThreadNext = bdbList;
- bdbList = bdb;
-
- //ASSERT(!(bdb->flags & BDB_write_pending));
- //bdb->flags |= BDB_write_pending;
- memcpy(p, bdb->buffer, pageSize);
- p += pageSize;
- bdb->flushIt = false;
- markClean(bdb);
- bdb->isDirty = false;
- bdb->release(REL_HISTORY);
-
- bdb = lockFindBdbIncrementUseCount(dbb, bdb->pageNumber + 1);
- if (!bdb)
- break;
-
- if (!bdb->isDirty && !continueWrite(bdb))
+ hit = true;
+ count = 0;
+ dbb = bdb->dbb;
+
+ if (!bdb->hash)
+ flushBitmap->clear(pageNumber);
+
+ while (p < end)
{
- bdb->decrementUseCount(REL_HISTORY);
- break;
+ ++count;
+ bdb->incrementUseCount(ADD_HISTORY);
+ sync.unlock();
+ bdb->addRef(Shared COMMA_ADD_HISTORY);
+ if (falcon_use_sectorcache)
+ sectorCache->writePage(bdb);
+
+ bdb->syncWrite.lock(NULL, Exclusive);
+ bdb->ioThreadNext = bdbList;
+ bdbList = bdb;
+
+ //ASSERT(!(bdb->flags & BDB_write_pending));
+ //bdb->flags |= BDB_write_pending;
+ memcpy(p, bdb->buffer, pageSize);
+ p += pageSize;
+ bdb->flushIt = false;
+ markClean(bdb);
+ bdb->isDirty = false;
+ bdb->release(REL_HISTORY);
+ sync.lock(Shared);
+
+ if ( !(bdb = findBdb(dbb, bdb->pageNumber + 1)) )
+ break;
+
+ if (!bdb->isDirty && !continueWrite(bdb))
+ break;
}
- }
-
- flushLock.unlock();
- //Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
- int length = p - buffer;
- priority.schedule(PRIORITY_LOW);
-
- try
- {
+
+ if (sync.state != None)
+ sync.unlock();
+
+ flushLock.unlock();
+ //Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
+ int length = p - buffer;
priority.schedule(PRIORITY_LOW);
- dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
- }
- catch (SQLException& exception)
- {
- priority.finished();
-
- if (exception.getSqlcode() != DEVICE_FULL)
- throw;
-
- database->setIOError(&exception);
-
- for (bool error = true; error;)
+
+ try
+ {
+ priority.schedule(PRIORITY_LOW);
+ dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ }
+ catch (SQLException& exception)
{
- if (thread->shutdownInProgress)
+ priority.finished();
+
+ if (exception.getSqlcode() != DEVICE_FULL)
+ throw;
+
+ database->setIOError(&exception);
+
+ for (bool error = true; error;)
{
- Bdb *next;
+ if (thread->shutdownInProgress)
+ {
+ Bdb *next;
- for (bdb = bdbList; bdb; bdb = next)
+ for (bdb = bdbList; bdb; bdb = next)
+ {
+ //bdb->flags &= ~BDB_write_pending;
+ next = bdb->ioThreadNext;
+ bdb->syncWrite.unlock();
+ bdb->decrementUseCount(REL_HISTORY);
+ }
+
+ return;
+ }
+
+ thread->sleep(1000);
+
+ try
{
- //bdb->flags &= ~BDB_write_pending;
- next = bdb->ioThreadNext;
- bdb->syncWrite.unlock();
- bdb->decrementUseCount(REL_HISTORY);
+ priority.schedule(PRIORITY_LOW);
+ dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+ error = false;
+ database->clearIOError();
+ }
+ catch (SQLException& exception2)
+ {
+ priority.finished();
+
+ if (exception2.getSqlcode() != DEVICE_FULL)
+ throw;
}
-
- return;
- }
-
- thread->sleep(1000);
-
- try
- {
- priority.schedule(PRIORITY_LOW);
- dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
- error = false;
- database->clearIOError();
- }
- catch (SQLException& exception2)
- {
- priority.finished();
-
- if (exception2.getSqlcode() != DEVICE_FULL)
- throw;
}
}
- }
- priority.finished();
- Bdb *next;
+ priority.finished();
+ Bdb *next;
- for (bdb = bdbList; bdb; bdb = next)
- {
- //ASSERT(bdb->flags & BDB_write_pending);
- //bdb->flags &= ~BDB_write_pending;
- next = bdb->ioThreadNext;
- bdb->syncWrite.unlock();
- bdb->decrementUseCount(REL_HISTORY);
+ for (bdb = bdbList; bdb; bdb = next)
+ {
+ //ASSERT(bdb->flags & BDB_write_pending);
+ //bdb->flags &= ~BDB_write_pending;
+ next = bdb->ioThreadNext;
+ bdb->syncWrite.unlock();
+ bdb->decrementUseCount(REL_HISTORY);
+ }
+
+ flushLock.lock(Exclusive);
+ ++physicalWrites;
+
+ break;
}
-
- flushLock.lock(Exclusive);
- ++physicalWrites;
-
- }
- else
- {
- if (bdb)
- bdb->decrementUseCount(REL_HISTORY);
- }
-
- if (!hit)
+
+ if (!hit)
{
+ sync.unlock();
flushBitmap->clear(pageNumber);
}
}
- else
+ else
{
if (flushing)
{
int writes = physicalWrites;
int pages = flushPages;
int delta = (int) (database->timestamp - flushStart);
- int64 callbackArg = flushArg;
flushing = false;
- flushArg = 0;
flushLock.unlock();
syncWait.unlock();
-
+
if (writes > 0 && Log::isActive(LogInfo))
Log::log(LogInfo, "%d: Cache flush: %d pages, %d writes in %d seconds (%d pps)\n",
- database->deltaTime, pages, writes, delta, pages / MAX(delta, 1));
+ database->deltaTime, pages, writes, delta, pages / MAX(delta, 1));
- if (callbackArg != 0)
- database->pageCacheFlushed(callbackArg);
+ database->pageCacheFlushed(flushArg);
}
else
flushLock.unlock();
-
+
if (thread->shutdownInProgress)
break;
thread->sleep();
flushLock.lock(Exclusive);
}
- } // for ever
-
- delete [] rawBuffer;
+ }
+
+ delete [] rawBuffer;
}
bool Cache::continueWrite(Bdb* startingBdb)
@@ -1118,26 +955,23 @@ bool Cache::continueWrite(Bdb* startingB
Dbb *dbb = startingBdb->dbb;
int clean = 1;
int dirty = 0;
-
+
for (int32 pageNumber = startingBdb->pageNumber + 1, end = pageNumber+ 5; pageNumber < end; ++pageNumber)
{
- Bdb *bdb;
-
+ Bdb *bdb = findBdb(dbb, pageNumber);
+
if (dirty > clean)
return true;
-
- bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
+
if (!bdb)
return dirty >= clean;
-
+
if (bdb->isDirty)
++dirty;
else
++clean;
-
- bdb->decrementUseCount(REL_HISTORY);
}
-
+
return (dirty >= clean);
}
@@ -1164,97 +998,77 @@ void Cache::shutdownThreads(void)
ioThreads[n]->shutdown();
ioThreads[n] = 0;
}
-
- Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
- lockThreads.lock(Exclusive);
+
+ Sync sync(&syncThreads, "Cache::shutdownThreads");
+ sync.lock(Exclusive);
}
-#ifdef CACHE_TRACE_FILE
void Cache::analyzeFlush(void)
{
Dbb *dbb = NULL;
Bdb *bdb;
- Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
-
- dirtyLock.lock (Shared);
+
for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
if (bdb->dbb->tableSpaceId == 1)
{
dbb = bdb->dbb;
-
+
break;
}
- dirtyLock.unlock();
-
+
if (!dbb)
return;
-
+
fprintf(traceFile, "-------- time %d -------\n", database->deltaTime);
for (int pageNumber = 0; (pageNumber = flushBitmap->nextSet(pageNumber)) >= 0;)
- // non-protected access to hash table via findBdb()!
if ( (bdb = findBdb(dbb, pageNumber)) )
{
int start = pageNumber;
int type = bdb->buffer->pageType;
-
- // non-protected access to hash table via findBdb()!
+
for (; (bdb = findBdb(dbb, ++pageNumber)) && bdb->flushIt;)
;
-
+
fprintf(traceFile, " %d flushed: %d to %d, first type %d\n", pageNumber - start, start, pageNumber - 1, type);
-
- // non-protected access to hash table via findBdb()!
+
for (int max = pageNumber + 5; pageNumber < max && (bdb = findBdb(dbb, pageNumber)) && !bdb->flushIt; ++pageNumber)
{
if (bdb->isDirty)
fprintf(traceFile, " %d dirty not flushed, type %d \n", pageNumber, bdb->buffer->pageType);
else
- fprintf(traceFile, " %d not dirty, type %d\n", pageNumber, bdb->buffer->pageType);
+ fprintf(traceFile," %d not dirty, type %d\n", pageNumber, bdb->buffer->pageType);
}
}
else
++pageNumber;
-
- fflush(traceFile);
+
+ fflush(traceFile);
}
void Cache::openTraceFile(void)
{
+#ifdef TRACE_FILE
if (traceFile)
closeTraceFile();
-
- traceFile = fopen(TRACE_FILE, "a+");
- fprintf(traceFile, "Starting\n");
-//KEL
-// setvbuf(traceFile, (char *) NULL, _IOLBF, 0);
-
+
+ traceFile = fopen(TRACE_FILE, "w");
+#endif
}
void Cache::closeTraceFile(void)
{
+#ifdef TRACE_FILE
if (traceFile)
{
fclose(traceFile);
traceFile = NULL;
}
+#endif
}
-#else // CACHE_TRACE_FILE
-void Cache::analyzeFlush(void)
-{
-}
-
-void Cache::openTraceFile(void)
-{
-}
-
-void Cache::closeTraceFile(void)
-{
-}
-#endif // CACHE_TRACE_FILE
void Cache::flushWait(void)
{
- Sync waitLock(&syncWait, "Cache::flushWait");
- waitLock.lock(Exclusive);
+ Sync sync(&syncWait, "Cache::flushWait");
+ sync.lock(Shared);
}
=== modified file 'storage/falcon/Cache.h'
--- a/storage/falcon/Cache.h 2008-09-02 16:08:11 +0000
+++ b/storage/falcon/Cache.h 2008-10-02 22:15:11 +0000
@@ -28,17 +28,6 @@
#include "SyncObject.h"
#include "Queue.h"
-// uncomment DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and test for race conditions
-//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
-#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
-# define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
-# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) & DEBUG_SYNC_HASH_TABLE_MASK)
-#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
-# define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
-#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
-
-#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
-
class Bdb;
class Dbb;
class PageWriter;
@@ -65,6 +54,7 @@ public:
void markClean (Bdb *bdb);
void markDirty (Bdb *bdb);
void validate();
+ void moveToHead (Bdb *bdb);
void flush(int64 arg);
void validateCache(void);
void syncFile(Dbb *dbb, const char *text);
@@ -93,20 +83,14 @@ public:
bool flushing;
protected:
- void moveToHead (Bdb *bdb);
- void moveToHeadAlreadyLocked (Bdb *bdb);
- Bdb* getFreeBuffer(void);
- Bdb* findBdb(Dbb* dbb, int32 pageNumber, int slot);
+ Bdb* findBuffer (Dbb *dbb, int pageNumber, LockType lockType);
Bdb* findBdb(Dbb* dbb, int32 pageNumber);
- Bdb* lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber);
- Bdb* lockFindBdbIncrementUseCount(int32 pageNumber, int slot);
int64 flushArg;
Bdb *bdbs;
Bdb *endBdbs;
Queue<Bdb> bufferQueue;
Bdb **hashTable;
- SyncObject *syncHashTable;
Bdb *firstDirty;
Bdb *lastDirty;
Bitmap *flushBitmap;
@@ -121,13 +105,12 @@ protected:
int flushPages;
int physicalWrites;
int hashSize;
- unsigned int hashMask;
int pageSize;
- unsigned int upperFraction;
+ int upperFraction;
int numberHunks;
+ int numberDirtyPages;
int numberIoThreads;
- volatile uint64 bufferAge;
-
+ volatile int bufferAge;
public:
void flushWait(void);
};
=== modified file 'storage/falcon/Database.cpp'
--- a/storage/falcon/Database.cpp 2008-09-11 10:56:00 +0000
+++ b/storage/falcon/Database.cpp 2008-09-30 18:17:19 +0000
@@ -686,7 +686,6 @@ void Database::createDatabase(const char
deleteFilesOnExit = true;
throw;
}
-
}
void Database::openDatabase(const char * filename)
@@ -1455,50 +1454,61 @@ void Database::dropTable(Table *table, T
void Database::truncateTable(Table *table, Sequence *sequence, Transaction *transaction)
{
- Sync syncDDL(&syncSysDDL, "Database::truncateTable(1)");
- syncDDL.lock(Exclusive);
-
- table->checkDrop();
-
// Check for records in active transactions
if (hasUncommittedRecords(table, transaction))
throw SQLError(UNCOMMITTED_UPDATES, "table %s.%s has uncommitted updates and cannot be truncated",
table->schemaName, table->name);
-
- // Block table drop/add, table list scans ok
-
- Sync syncTbl(&syncTables, "Database::truncateTable(2)");
- syncTbl.lock(Shared);
+
+ // Lock SystemDDL first. This lock can happen multiple times in many call stacks,
+ // both before and after the following locks. So it is important that we get an
+ // exclusive lock first.
+
+ Sync syncDDLLock(&syncSysDDL, "Database::truncateTable(SysDDL)");
+ syncDDLLock.lock(Exclusive);
+
+ // Lock syncScavenge before locking syncSysDDL, syncTables, or table->syncObject.
+ // The scavenger locks syncScavenge and then syncTables
+ // If we run out of record memory, forceRecordScavenge will eventually call table->syncObject.
+
+ Sync syncScavengeLock(&syncScavenge, "Database::truncateTable(scavenge)");
+ syncScavengeLock.lock(Exclusive);
+
+ table->checkDrop();
+ // Block table drop/add, table list scans ok
+
+ Sync syncTablesLock(&syncTables, "Database::truncateTable(tables)");
+ syncTablesLock.lock(Shared);
+
//Lock sections (factored out of SRLDropTable to avoid a deadlock)
//The lock order (serialLog->syncSections before table->syncObject) is
//important
- Sync syncSections(&serialLog->syncSections, "Database::truncateTable(3)");
- syncSections.lock(Exclusive);
-
+ Sync syncSectionsLock(&serialLog->syncSections, "Database::truncateTable(sections)");
+ syncSectionsLock.lock(Exclusive);
+
// No table access until truncate completes
-
- Sync syncObj(&table->syncObject, "Database::truncateTable(4)");
- syncObj.lock(Exclusive);
-
+
+ Sync syncTableLock(&table->syncObject, "Database::truncateTable(table)");
+ syncTableLock.lock(Exclusive);
+
table->deleting = true;
-
+
// Purge records out of committed transactions
-
+
transactionManager->truncateTable(table, transaction);
-
+
Transaction *sysTransaction = getSystemTransaction();
-
+
// Recreate data/blob sections and indexes
-
+
table->truncate(sysTransaction);
-
+
commitSystemTransaction();
-
+
// Delete and recreate the sequence
-
+
if (sequence)
sequence = sequence->recreate();
}
=== modified file 'storage/falcon/DeferredIndex.cpp'
--- a/storage/falcon/DeferredIndex.cpp 2008-09-10 19:51:03 +0000
+++ b/storage/falcon/DeferredIndex.cpp 2008-10-04 00:10:34 +0000
@@ -819,7 +819,7 @@ void DeferredIndex::scanIndex(IndexKey *
void DeferredIndex::detachIndex(void)
{
Sync sync(&syncObject, "DeferredIndex::detachIndex");
- sync.lock(Shared);
+ sync.lock(Exclusive); // was Shared
index = NULL;
}
@@ -884,11 +884,7 @@ void DeferredIndex::addRef()
void DeferredIndex::releaseRef()
{
- ASSERT(useCount > 0);
-
- INTERLOCKED_DECREMENT(useCount);
-
- if (useCount == 0)
+ if (INTERLOCKED_DECREMENT(useCount) == 0)
delete this;
}
=== modified file 'storage/falcon/SRLUpdateIndex.cpp'
--- a/storage/falcon/SRLUpdateIndex.cpp 2008-07-15 18:57:27 +0000
+++ b/storage/falcon/SRLUpdateIndex.cpp 2008-10-04 00:10:34 +0000
@@ -40,14 +40,29 @@ SRLUpdateIndex::~SRLUpdateIndex(void)
void SRLUpdateIndex::append(DeferredIndex* deferredIndex)
{
+ uint indexId;
+ int idxVersion;
+ int tableSpaceId;
+
+ Sync syncDI(&deferredIndex->syncObject, "SRLUpdateIndex::append");
+ syncDI.lock(Shared);
+
+ if (!deferredIndex->index)
+ return;
+ else
+ {
+ indexId = deferredIndex->index->indexId;
+ idxVersion = deferredIndex->index->indexVersion;
+ tableSpaceId = deferredIndex->index->dbb->tableSpaceId;
+ }
+
+ syncDI.unlock();
+
Sync syncIndexes(&log->syncIndexes, "SRLUpdateIndex::append(1)");
syncIndexes.lock(Shared);
Transaction *transaction = deferredIndex->transaction;
DeferredIndexWalker walker(deferredIndex, NULL);
- uint indexId = deferredIndex->index->indexId;
- int idxVersion = deferredIndex->index->indexVersion;
- int tableSpaceId = deferredIndex->index->dbb->tableSpaceId;
uint64 virtualOffset = 0;
uint64 virtualOffsetAtEnd = 0;
=== modified file 'storage/falcon/SerialLogWindow.cpp'
--- a/storage/falcon/SerialLogWindow.cpp 2008-03-11 16:15:47 +0000
+++ b/storage/falcon/SerialLogWindow.cpp 2008-10-02 09:01:39 +0000
@@ -117,7 +117,7 @@ SerialLogBlock* SerialLogWindow::readFir
void SerialLogWindow::write(SerialLogBlock *block)
{
uint32 length = ROUNDUP(block->length, sectorSize);
- uint32 offset = (int) (origin + ((UCHAR*) block - buffer));
+ int64 offset = origin + ((UCHAR*) block - buffer);
ASSERT(length <= bufferLength);
try
=== modified file 'storage/falcon/StorageHandler.cpp'
--- a/storage/falcon/StorageHandler.cpp 2008-09-11 10:56:00 +0000
+++ b/storage/falcon/StorageHandler.cpp 2008-10-01 03:13:44 +0000
@@ -61,11 +61,11 @@ static const char *createTempSpace = "up
static const char *falconSchema [] = {
//"create tablespace " DEFAULT_TABLESPACE " filename '" FALCON_USER "' allocation 2000000000",
createTempSpace,
-
+
"upgrade table falcon.tablespaces ("
" name varchar(128) not null primary key,"
" pathname varchar(1024) not null)",
-
+
"upgrade table falcon.tables ("
" given_schema_name varchar(128) not null,"
" effective_schema_name varchar(128) not null,"
@@ -73,11 +73,11 @@ static const char *falconSchema [] = {
" effective_table_name varchar(128) not null,"
" tablespace_name varchar(128) not null,"
" pathname varchar(1024) not null primary key)",
-
+
"upgrade unique index effective on falcon.tables (effective_schema_name, effective_table_name)",
NULL };
-
+
class Server;
extern Server* startServer(int port, const char *configFile);
@@ -95,13 +95,13 @@ static const char THIS_FILE[]=__FILE__;
int init()
{
const char *p;
-
+
for (p = WHITE_SPACE; *p; p++)
charTable[(unsigned char)*p] = 1;
-
+
for (p = PUNCTUATION_CHARS; *p; p++)
charTable[(unsigned char)*p] = 1;
-
+
return 1;
}
@@ -109,7 +109,7 @@ StorageHandler* getFalconStorageHandler(
{
if (!storageHandler)
storageHandler = new StorageHandler(lockSize);
-
+
return storageHandler;
}
@@ -151,7 +151,7 @@ StorageHandler::~StorageHandler(void)
storageDatabases[n] = storageDatabase->collision;
delete storageDatabase;
}
-
+
for (int n = 0; n < tableHashSize; ++n)
for (StorageTableShare *table; (table = tables[n]);)
{
@@ -190,11 +190,11 @@ void StorageHandler::shutdownHandler(voi
dictionaryConnection->close();
dictionaryConnection = NULL;
}
-
+
for (int n = 0; n < databaseHashSize; ++n)
for (StorageDatabase *storageDatabase = storageDatabases[n]; storageDatabase; storageDatabase = storageDatabase->collision)
storageDatabase->close();
-
+
/***
Configuration configuration(NULL);
Connection *connection = new Connection(&configuration);
@@ -207,14 +207,14 @@ void StorageHandler::databaseDropped(Sto
{
if (!storageDatabase && storageConnection)
storageDatabase = storageConnection->storageDatabase;
-
+
if (storageDatabase)
{
Sync syncHash(&hashSyncObject, "StorageHandler::databaseDropped(1)");
int slot = JString::hash(storageDatabase->name, databaseHashSize);
syncHash.lock(Exclusive);
StorageDatabase **ptr;
-
+
for (ptr = storageDatabases + slot; *ptr; ptr = &(*ptr)->collision)
if (*ptr == storageDatabase)
{
@@ -240,7 +240,7 @@ void StorageHandler::databaseDropped(Sto
for (StorageConnection *cnct = connections[n]; cnct; cnct = cnct->collision)
if (cnct != storageConnection)
cnct->databaseDropped(storageDatabase);
-
+
sync.unlock();
}
@@ -256,8 +256,8 @@ int StorageHandler::startTransaction(THD
Sync sync(&syncObject, "StorageHandler::commit");
sync.lock(Shared);
int slot = HASH(mySqlThread, connectionHashSize);
-
- for (StorageConnection *storageConnection = connections[slot];
+
+ for (StorageConnection *storageConnection = connections[slot];
storageConnection; storageConnection = storageConnection->collision)
{
if (storageConnection->mySqlThread == mySqlThread)
@@ -276,16 +276,16 @@ int StorageHandler::commit(THD* mySqlThr
Sync sync(&syncObject, "StorageHandler::commit");
sync.lock(Shared);
int slot = HASH(mySqlThread, connectionHashSize);
-
+
for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
if (connection->mySqlThread == mySqlThread)
{
int ret =connection->commit();
-
+
if (ret)
return ret;
}
-
+
return 0;
}
@@ -294,16 +294,16 @@ int StorageHandler::prepare(THD* mySqlTh
Sync sync(&syncObject, "StorageHandler::prepare");
sync.lock(Shared);
int slot = HASH(mySqlThread, connectionHashSize);
-
+
for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
if (connection->mySqlThread == mySqlThread)
{
int ret = connection->prepare(xidSize, xid);
-
+
if (ret)
return ret;
}
-
+
return 0;
}
@@ -312,16 +312,16 @@ int StorageHandler::rollback(THD* mySqlT
Sync sync(&syncObject, "StorageHandler::rollback");
sync.lock(Shared);
int slot = HASH(mySqlThread, connectionHashSize);
-
+
for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
if (connection->mySqlThread == mySqlThread)
{
int ret = connection->rollback();
-
+
if (ret)
return ret;
}
-
+
return 0;
}
@@ -330,11 +330,11 @@ int StorageHandler::releaseVerb(THD* myS
Sync sync(&syncObject, "StorageHandler::releaseVerb");
sync.lock(Shared);
int slot = HASH(mySqlThread, connectionHashSize);
-
+
for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
if (connection->mySqlThread == mySqlThread)
connection->releaseVerb();
-
+
return 0;
}
@@ -343,11 +343,11 @@ int StorageHandler::rollbackVerb(THD* my
Sync sync(&syncObject, "StorageHandler::rollbackVerb");
sync.lock(Shared);
int slot = HASH(mySqlThread, connectionHashSize);
-
+
for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
if (connection->mySqlThread == mySqlThread)
connection->rollbackVerb();
-
+
return 0;
}
@@ -357,7 +357,7 @@ int StorageHandler::savepointSet(THD* my
sync.lock(Shared);
int slot = HASH(mySqlThread, connectionHashSize);
StorageSavepoint *savepoints = NULL;
-
+
for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
if (connection->mySqlThread == mySqlThread)
{
@@ -367,9 +367,9 @@ int StorageHandler::savepointSet(THD* my
savepoint->storageConnection = connection;
savepoint->savepoint = connection->savepointSet();
}
-
+
*((void**) savePoint) = savepoints;
-
+
return 0;
}
@@ -377,15 +377,15 @@ int StorageHandler::savepointRelease(THD
{
Sync sync(&syncObject, "StorageHandler::savepointRelease");
sync.lock(Shared);
-
- for (StorageSavepoint *savepoints = *(StorageSavepoint**) savePoint, *savepoint;
+
+ for (StorageSavepoint *savepoints = *(StorageSavepoint**) savePoint, *savepoint;
(savepoint = savepoints);)
{
savepoint->storageConnection->savepointRelease(savepoint->savepoint);
savepoints = savepoint->next;
delete savepoint;
}
-
+
*((void**) savePoint) = NULL;
return 0;
@@ -395,15 +395,15 @@ int StorageHandler::savepointRollback(TH
{
Sync sync(&syncObject, "StorageHandler::savepointRollback");
sync.lock(Shared);
-
- for (StorageSavepoint *savepoints = *(StorageSavepoint**) savePoint, *savepoint;
+
+ for (StorageSavepoint *savepoints = *(StorageSavepoint**) savePoint, *savepoint;
(savepoint = savepoints);)
{
savepoint->storageConnection->savepointRollback(savepoint->savepoint);
savepoints = savepoint->next;
delete savepoint;
}
-
+
*((void**) savePoint) = NULL;
return 0;
@@ -414,22 +414,22 @@ StorageDatabase* StorageHandler::getStor
Sync sync(&hashSyncObject, "StorageHandler::getStorageDatabase");
int slot = JString::hash(dbName, databaseHashSize);
StorageDatabase *storageDatabase;
-
+
if (storageDatabases[slot])
{
sync.lock(Shared);
-
+
if ( (storageDatabase = findDatabase(dbName)) )
return storageDatabase;
-
+
sync.unlock();
}
-
+
sync.lock(Exclusive);
if ( (storageDatabase = findDatabase(dbName)) )
return storageDatabase;
-
+
storageDatabase = new StorageDatabase(this, dbName, path);
storageDatabase->load();
storageDatabase->collision = storageDatabases[slot];
@@ -437,7 +437,7 @@ StorageDatabase* StorageHandler::getStor
storageDatabase->addRef();
storageDatabase->next = databaseList;
databaseList = storageDatabase;
-
+
return storageDatabase;
}
@@ -446,7 +446,7 @@ void StorageHandler::closeDatabase(const
Sync sync(&hashSyncObject, "StorageHandler::closeDatabase");
int slot = JString::hash(path, databaseHashSize);
sync.lock(Exclusive);
-
+
for (StorageDatabase *storageDatabase, **ptr = storageDatabases + slot; (storageDatabase = *ptr); ptr = &storageDatabase->collision)
if (storageDatabase->filename == path)
{
@@ -505,9 +505,7 @@ int StorageHandler::createTablespace(con
return StorageErrorTableSpaceExist;
}
- JString tableSpace = JString::upcase(tableSpaceName);
-
- TableSpaceManager *tableSpaceManager =
+ TableSpaceManager *tableSpaceManager =
dictionaryConnection->database->tableSpaceManager;
if (!tableSpaceManager->waitForPendingDrop(filename, 10))
@@ -527,16 +525,16 @@ int StorageHandler::createTablespace(con
{
if (exception.getSqlcode() == TABLESPACE_EXIST_ERROR)
return StorageErrorTableSpaceExist;
-
+
if (exception.getSqlcode() == TABLESPACE_NOT_EXIST_ERROR)
return StorageErrorTableSpaceNotExist;
if (exception.getSqlcode() == TABLESPACE_DATAFILE_EXIST_ERROR)
return StorageErrorTableSpaceDataFileExist;
-
+
return StorageErrorTablesSpaceOperationFailed;
}
-
+
return 0;
}
@@ -554,7 +552,7 @@ int StorageHandler::deleteTablespace(con
{
return StorageErrorTablesSpaceOperationFailed;
}
-
+
try
{
CmdGen gen;
@@ -568,16 +566,16 @@ int StorageHandler::deleteTablespace(con
catch (SQLException& exception)
{
int sqlCode = exception.getSqlcode();
-
+
if (sqlCode == TABLESPACE_NOT_EXIST_ERROR)
return StorageErrorTableSpaceNotExist;
-
+
if (sqlCode == TABLESPACE_NOT_EMPTY)
return StorageErrorTableNotEmpty;
-
+
return StorageErrorTablesSpaceOperationFailed;
}
-
+
return 0;
}
@@ -592,26 +590,26 @@ StorageTableShare* StorageHandler::findT
if (tables[slot])
{
sync.lock(Shared);
-
+
for (tableShare = tables[slot]; tableShare; tableShare = tableShare->collision)
if (tableShare->pathName == filename)
return tableShare;
-
+
sync.unlock();
}
sync.lock(Exclusive);
-
+
for (tableShare = tables[slot]; tableShare; tableShare = tableShare->collision)
if (tableShare->pathName == filename)
return tableShare;
-
+
tableShare = new StorageTableShare(this, filename, NULL, mySqlLockSize, false);
tableShare->collision = tables[slot];
tables[slot] = tableShare;
-
+
ASSERT(tableShare->collision != tableShare);
-
+
return tableShare;
}
@@ -632,7 +630,7 @@ StorageTableShare* StorageHandler::preDe
{
Sync sync(&hashSyncObject, "StorageHandler::preDeleteTable");
sync.lock(Shared);
-
+
for (tableShare = tables[slot]; tableShare; tableShare = tableShare->collision)
if (tableShare->pathName == filename)
return tableShare;
@@ -643,14 +641,14 @@ StorageTableShare* StorageHandler::preDe
tableShare = new StorageTableShare(this, filename, NULL, mySqlLockSize, false);
JString path = tableShare->lookupPathName();
delete tableShare;
-
+
if (path == pathname)
return findTable(pathname);
}
catch (...)
{
}
-
+
return NULL;
}
@@ -663,17 +661,17 @@ StorageTableShare* StorageHandler::creat
return NULL;
StorageTableShare *tableShare = new StorageTableShare(this, pathname, tableSpaceName, mySqlLockSize, tempTable);
-
+
if (tableShare->tableExists())
{
delete tableShare;
-
+
return NULL;
}
addTable(tableShare);
tableShare->registerTable();
-
+
return tableShare;
}
@@ -684,7 +682,7 @@ void StorageHandler::addTable(StorageTab
sync.lock(Exclusive);
table->collision = tables[slot];
tables[slot] = table;
-
+
ASSERT(table->collision != table);
}
@@ -693,7 +691,7 @@ void StorageHandler::removeTable(Storage
Sync sync(&hashSyncObject, "StorageHandler::removeTable");
sync.lock(Exclusive);
int slot = JString::hash(table->pathName, tableHashSize);
-
+
for (StorageTableShare **ptr = tables + slot; *ptr; ptr = &(*ptr)->collision)
if (*ptr == table)
{
@@ -705,7 +703,7 @@ void StorageHandler::removeTable(Storage
StorageConnection* StorageHandler::getStorageConnection(StorageTableShare* tableShare, THD* mySqlThread, int mySqlThdId, OpenOption createFlag)
{
Sync sync(&syncObject, "StorageHandler::getStorageConnection");
-
+
if (!defaultDatabase)
initialize();
@@ -727,10 +725,10 @@ StorageConnection* StorageHandler::getSt
if (storageConnection->mySqlThread == mySqlThread) // && storageConnection->storageDatabase == tableShare->storageDatabase)
{
storageConnection->addRef();
-
+
if (!tableShare->storageDatabase)
tableShare->setDatabase(storageDatabase);
-
+
return storageConnection;
}
@@ -743,16 +741,16 @@ StorageConnection* StorageHandler::getSt
if (storageConnection->mySqlThread == mySqlThread) // && storageConnection->storageDatabase == tableShare->storageDatabase)
{
storageConnection->addRef();
-
+
if (!tableShare->storageDatabase)
tableShare->setDatabase(storageDatabase);
-
+
return storageConnection;
}
-
+
storageConnection = new StorageConnection(this, storageDatabase, mySqlThread, mySqlThdId);
bool success = false;
-
+
if (createFlag != CreateDatabase) // && createFlag != OpenTemporaryDatabase)
try
{
@@ -763,15 +761,15 @@ StorageConnection* StorageHandler::getSt
{
//fprintf(stderr, "database open failed: %s\n", exception.getText());
storageConnection->setErrorText(exception.getText());
-
+
if (createFlag == OpenDatabase)
{
delete storageConnection;
-
+
return NULL;
}
}
-
+
if (!success && createFlag != OpenDatabase)
try
{
@@ -780,29 +778,29 @@ StorageConnection* StorageHandler::getSt
catch (SQLException&)
{
delete storageConnection;
-
+
return NULL;
}
-
+
tableShare->setDatabase(storageDatabase);
storageConnection->collision = connections[slot];
connections[slot] = storageConnection;
-
+
return storageConnection;
}
StorageDatabase* StorageHandler::findDatabase(const char* dbName)
{
int slot = JString::hash(dbName, databaseHashSize);
-
+
for (StorageDatabase *storageDatabase = storageDatabases[slot]; storageDatabase; storageDatabase = storageDatabase->collision)
if (storageDatabase->name == dbName)
{
storageDatabase->addRef();
-
+
return storageDatabase;
}
-
+
return NULL;
}
@@ -820,7 +818,7 @@ void StorageHandler::changeMySqlThread(S
void StorageHandler::removeConnection(StorageConnection* storageConnection)
{
int slot = HASH(storageConnection->mySqlThread, connectionHashSize);
-
+
for (StorageConnection **ptr = connections + slot; *ptr; ptr = &(*ptr)->collision)
if (*ptr == storageConnection)
{
@@ -838,7 +836,7 @@ int StorageHandler::closeConnections(THD
for (StorageConnection *storageConnection = connections[slot], *next; storageConnection; storageConnection = next)
{
next = storageConnection->collision;
-
+
if (storageConnection->mySqlThread == thd)
{
sync.unlock();
@@ -847,7 +845,7 @@ int StorageHandler::closeConnections(THD
if (storageConnection->mySqlThread)
storageConnection->release(); // This is for thd->ha_data[falcon_hton->slot]
-
+
storageConnection->release(); // This is for storageConn
}
}
@@ -861,25 +859,25 @@ int StorageHandler::dropDatabase(const c
char pathname[FILENAME_MAX];
const char *SEPARATOR = pathname;
char *q = pathname;
-
+
for (const char *p = path; *p;)
{
char c = *p++;
-
+
if (c == '/')
{
if (*p == 0)
break;
-
+
SEPARATOR = q + 1;
}
-
+
*q++ = c;
}
-
+
*q = 0;
JString dbName = JString::upcase(SEPARATOR);
- strcpy(q, StorageTableShare::getDefaultRoot());
+ strcpy(q, StorageTableShare::getDefaultRoot());
StorageDatabase *storageDatabase = getStorageDatabase(dbName, pathname);
databaseDropped(storageDatabase, NULL);
@@ -893,7 +891,7 @@ int StorageHandler::dropDatabase(const c
storageDatabase->release();
***/
-
+
return 0;
}
@@ -901,7 +899,7 @@ void StorageHandler::getIOInfo(InfoTable
{
Sync sync(&hashSyncObject, "StorageHandler::getIOInfo");
sync.lock(Shared);
-
+
for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
storageDatabase->getIOInfo(infoTable);
}
@@ -930,7 +928,7 @@ void StorageHandler::getTransactionInfo(
{
Sync sync(&hashSyncObject, "StorageHandler::getTransactionInfo");
sync.lock(Shared);
-
+
for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
storageDatabase->getTransactionInfo(infoTable);
}
@@ -939,7 +937,7 @@ void StorageHandler::getSerialLogInfo(In
{
Sync sync(&hashSyncObject, "StorageHandler::getSerialLogInfo");
sync.lock(Shared);
-
+
for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
storageDatabase->getSerialLogInfo(infoTable);
}
@@ -953,7 +951,7 @@ void StorageHandler::getTransactionSumma
{
Sync sync(&hashSyncObject, "StorageHandler::getTransactionSummaryInfo");
sync.lock(Shared);
-
+
for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
storageDatabase->getTransactionSummaryInfo(infoTable);
}
@@ -962,7 +960,7 @@ void StorageHandler::getTableSpaceInfo(I
{
Sync sync(&hashSyncObject, "StorageHandler::getTableSpaceInfo");
sync.lock(Shared);
-
+
for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
storageDatabase->getTableSpaceInfo(infoTable);
}
@@ -971,7 +969,7 @@ void StorageHandler::getTableSpaceFilesI
{
Sync sync(&hashSyncObject, "StorageHandler::getTableSpaceFilesInfo");
sync.lock(Shared);
-
+
for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
storageDatabase->getTableSpaceFilesInfo(infoTable);
}
@@ -980,16 +978,16 @@ void StorageHandler::initialize(void)
{
if (initialized)
return;
-
+
Sync sync(&syncObject, "StorageHandler::initialize");
sync.lock(Exclusive);
-
+
if (initialized)
return;
-
+
initialized = true;
defaultDatabase = getStorageDatabase(MASTER_NAME, MASTER_PATH);
-
+
try
{
defaultDatabase->getOpenConnection();
@@ -1001,7 +999,7 @@ void StorageHandler::initialize(void)
{
int err = e.getSqlcode();
- // If got one of following errors, just rethrow. No point in
+ // If got one of following errors, just rethrow. No point in
// trying to create database.
if (err != OPEN_MASTER_ERROR)
throw;
@@ -1046,14 +1044,14 @@ void StorageHandler::createDatabase(void
void StorageHandler::dropTempTables(void)
{
Statement *statement = dictionaryConnection->createStatement();
-
+
try
{
PStatement select = dictionaryConnection->prepareStatement(
"select schema,tablename from system.tables where tablespace='" TEMPORARY_TABLESPACE "'");
RSet resultSet = select->executeQuery();
bool hit = false;
-
+
while (resultSet->next())
{
CmdGen gen;
@@ -1061,14 +1059,14 @@ void StorageHandler::dropTempTables(void
statement->executeUpdate(gen.getString());
hit = true;
}
-
+
//if (hit)
//statement->executeUpdate(dropTempSpace);
}
catch(...)
{
}
-
+
try
{
statement->executeUpdate(createTempSpace);
@@ -1077,7 +1075,7 @@ void StorageHandler::dropTempTables(void
{
Log::log("Can't create temporary tablespace: %s\n", exception.getText());
}
-
+
statement->close();
}
@@ -1118,7 +1116,7 @@ void StorageHandler::cleanFileName(const
char *q = filename;
char *end = filename + filenameLength - 1;
filename[0] = 0;
-
+
for (const char *p = pathname; q < end && (c = *p++); prior = c)
if (c != SEPARATOR || c != prior)
*q++ = c;
@@ -1136,7 +1134,7 @@ void StorageHandler::getFalconVersionInf
int StorageHandler::recoverGetNextLimbo(int xidLength, unsigned char* xid)
{
- if (!defaultDatabase)
+ if (!defaultDatabase)
initialize();
if (Connection* connection = dictionaryConnection)
@@ -1149,14 +1147,14 @@ const char* StorageHandler::normalizeNam
{
char *q = buffer;
char *end = buffer + bufferSize - 1;
-
+
for (const char *p = name; *p && q < end; ++p)
if (charTable[(unsigned char)*p])
return name;
else
*q++ = UPPER(*p);
-
+
*q = 0;
-
+
return buffer;
}
=== modified file 'storage/falcon/StorageVersion.h'
--- a/storage/falcon/StorageVersion.h 2008-05-22 13:31:03 +0000
+++ b/storage/falcon/StorageVersion.h 2008-10-06 08:40:27 +0000
@@ -14,5 +14,5 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#define FALCON_VERSION "T1.2-5"
-#define FALCON_DATE "22 May, 2008"
+#define FALCON_VERSION "T1.2-6"
+#define FALCON_DATE "06 October, 2008"
=== modified file 'storage/falcon/Table.cpp'
--- a/storage/falcon/Table.cpp 2008-09-08 11:51:19 +0000
+++ b/storage/falcon/Table.cpp 2008-10-03 23:56:24 +0000
@@ -2117,7 +2117,10 @@ void Table::garbageCollect(Record *leavi
if (!leaving && !staying)
return;
- Sync syncPrior(getSyncPrior(leaving ? leaving : staying), "Table::garbageCollect");
+ Sync sync (&syncObject, "Table::garbageCollect(1)");
+ sync.lock(Shared);
+
+ Sync syncPrior(getSyncPrior(leaving ? leaving : staying), "Table::garbageCollect(2)");
syncPrior.lock(Shared);
// Clean up field indexes
=== modified file 'storage/falcon/Transaction.cpp'
--- a/storage/falcon/Transaction.cpp 2008-09-10 19:51:03 +0000
+++ b/storage/falcon/Transaction.cpp 2008-10-02 23:51:36 +0000
@@ -1076,14 +1076,10 @@ void Transaction::addRef()
INTERLOCKED_INCREMENT(useCount);
}
-int Transaction::release()
+void Transaction::release()
{
- int count = INTERLOCKED_DECREMENT(useCount);
-
- if (count == 0)
+ if (INTERLOCKED_DECREMENT(useCount) == 0)
delete this;
-
- return count;
}
int Transaction::createSavepoint()
@@ -1317,7 +1313,7 @@ void Transaction::add(DeferredIndex* def
Sync sync(&syncDeferredIndexes, "Transaction::add");
sync.lock(Exclusive);
- deferredIndex->addRef();
+// deferredIndex->addRef(); // temporarily disabled for Bug#39711
deferredIndex->nextInTransaction = deferredIndexes;
deferredIndexes = deferredIndex;
deferredIndexCount++;
=== modified file 'storage/falcon/Transaction.h'
--- a/storage/falcon/Transaction.h 2008-09-10 04:02:07 +0000
+++ b/storage/falcon/Transaction.h 2008-10-02 23:20:47 +0000
@@ -99,7 +99,7 @@ public:
void prepare(int xidLength, const UCHAR *xid);
void rollback();
void commit();
- int release();
+ void release();
void addRef();
void waitForTransaction();
bool waitForTransaction (TransId transId);
=== modified file 'storage/falcon/ha_falcon.cpp'
--- a/storage/falcon/ha_falcon.cpp 2008-09-16 17:58:49 +0000
+++ b/storage/falcon/ha_falcon.cpp 2008-10-03 05:15:40 +0000
@@ -2180,46 +2180,6 @@ int StorageInterface::check_if_supported
}
}
- // TODO for Add Index:
- // 1. Check for supported ALTER combinations
- // 2. Can error message be improved for non-null columns?
-
- if (alter_flags->is_set(HA_ADD_INDEX) || alter_flags->is_set(HA_ADD_UNIQUE_INDEX))
- {
- for (unsigned int n = 0; n < altered_table->s->keys; n++)
- {
- if (n != altered_table->s->primary_key)
- {
- KEY *key = altered_table->key_info + n;
- KEY *tableEnd = table->key_info + table->s->keys;
- KEY *tableKey;
-
- // Determine if this is a new index
-
- for (tableKey = table->key_info; tableKey < tableEnd; tableKey++)
- if (!strcmp(tableKey->name, key->name))
- break;
-
- // Verify that each part is nullable
-
- if (tableKey >= tableEnd)
- for (uint p = 0; p < key->key_parts; p++)
- {
- KEY_PART_INFO *keyPart = key->key_part + p;
- if (keyPart && !keyPart->field->real_maybe_null())
- {
- DBUG_PRINT("info",("Online add index columns must be nullable"));
- DBUG_RETURN(HA_ALTER_NOT_SUPPORTED);
- }
- }
- }
- }
- }
-
- if (alter_flags->is_set(HA_DROP_INDEX) || alter_flags->is_set(HA_DROP_UNIQUE_INDEX))
- {
- }
-
DBUG_RETURN(HA_ALTER_SUPPORTED_NO_LOCK);
}
| Thread |
|---|
| • bzr commit into mysql-6.0 branch (kgeorge:2855) | Georgi Kodinov | 7 Oct |