List:Commits« Previous MessageNext Message »
From:Georgi Kodinov Date:October 7 2008 5:05pm
Subject:bzr commit into mysql-6.0 branch (kgeorge:2855)
View as plain text  
#At file:///home/kgeorge/mysql/bzr/merge-6.0-bugteam/

 2855 Georgi Kodinov	2008-10-07 [merge]
      merged 6.0-main -> 6.0-bugteam
removed:
  libmysql/dll.c
added:
  sql/replication.h
  sql/rpl_handler.cc
  sql/rpl_handler.h
renamed:
  mysql-test/suite/falcon_team/r/falcon_bug_23945.result => mysql-test/suite/falcon/r/falcon_bug_23945.result
  mysql-test/suite/falcon_team/r/falcon_bug_34892.result => mysql-test/suite/falcon/r/falcon_bug_34892.result
  mysql-test/suite/falcon_team/t/falcon_bug_23945.test => mysql-test/suite/falcon/t/falcon_bug_23945.test
  mysql-test/suite/falcon_team/t/falcon_bug_34892.test => mysql-test/suite/falcon/t/falcon_bug_34892.test
modified:
  include/mysql/plugin.h
  include/mysql/plugin.h.pp
  libmysql/CMakeLists.txt
  libmysql/Makefile.am
  libmysqld/CMakeLists.txt
  libmysqld/Makefile.am
  mysql-test/suite/falcon/r/falcon_bug_34890.result
  mysql-test/suite/falcon/r/falcon_online_index.result
  mysql-test/suite/falcon/t/falcon_bug_34890.test
  mysql-test/suite/falcon/t/falcon_online_index.test
  mysql-test/suite/falcon_team/t/disabled.def
  mysql-test/suite/falcon_team/t/test2bug.def
  sql/CMakeLists.txt
  sql/Makefile.am
  sql/log.cc
  sql/log.h
  sql/mysql_priv.h
  sql/mysqld.cc
  sql/slave.cc
  sql/sql_insert.cc
  sql/sql_parse.cc
  sql/sql_plugin.cc
  sql/sql_plugin.h
  sql/sql_repl.cc
  sql/transaction.cc
  storage/falcon/Cache.cpp
  storage/falcon/Cache.h
  storage/falcon/Database.cpp
  storage/falcon/DeferredIndex.cpp
  storage/falcon/SRLUpdateIndex.cpp
  storage/falcon/SerialLogWindow.cpp
  storage/falcon/StorageHandler.cpp
  storage/falcon/StorageVersion.h
  storage/falcon/Table.cpp
  storage/falcon/Transaction.cpp
  storage/falcon/Transaction.h
  storage/falcon/ha_falcon.cpp
  mysql-test/suite/falcon/r/falcon_bug_23945.result
  mysql-test/suite/falcon/t/falcon_bug_23945.test
  mysql-test/suite/falcon/t/falcon_bug_34892.test

=== modified file 'include/mysql/plugin.h'
--- a/include/mysql/plugin.h	2008-06-28 11:00:59 +0000
+++ b/include/mysql/plugin.h	2008-09-23 14:33:18 +0000
@@ -16,6 +16,11 @@
 #ifndef _my_plugin_h
 #define _my_plugin_h
 
+/* size_t */
+#include <stdlib.h>
+
+typedef struct st_mysql MYSQL;
+
 #ifdef __cplusplus
 class THD;
 class Item;
@@ -66,7 +71,8 @@ typedef struct st_mysql_xid MYSQL_XID;
 #define MYSQL_DAEMON_PLUGIN          3  /* The daemon/raw plugin type */
 #define MYSQL_INFORMATION_SCHEMA_PLUGIN  4  /* The I_S plugin type */
 #define MYSQL_AUDIT_PLUGIN           5  /* The Audit plugin type        */
-#define MYSQL_MAX_PLUGIN_TYPE_NUM    6  /* The number of plugin types   */
+#define MYSQL_REPLICATION_PLUGIN     6	/* The replication plugin type */
+#define MYSQL_MAX_PLUGIN_TYPE_NUM    7  /* The number of plugin types   */
 
 /* We use the following strings to define licenses for plugins */
 #define PLUGIN_LICENSE_PROPRIETARY 0
@@ -461,6 +467,19 @@ struct handlerton;
 
 
 /*************************************************************************
+  API for Replication plugin. (MYSQL_REPLICATION_PLUGIN)
+*/
+#define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100
+
+/**
+   Replication plugin descriptor
+*/
+struct Mysql_replication {
+  int interface_version;
+};
+
+
+/*************************************************************************
   st_mysql_value struct for reading values from mysqld.
   Used by server variables framework to parse user-provided values.
   Will be used for arguments when implementing UDFs.
@@ -613,6 +632,64 @@ void mysql_query_cache_invalidate4(MYSQL
                                    const char *key, unsigned int key_length,
                                    int using_trx);
 
+/**
+   Get the value of user variable as an integer.
+
+   This function will return the value of variable @a name as an
+   integer. If the original value of the variable is not an integer,
+   the value will be converted into an integer.
+
+   @param name     user variable name
+   @param value    pointer to return the value
+   @param null_value if not NULL, the function will set it to true if
+   the value of variable is null, set to false if not
+
+   @retval 0 Success
+   @retval 1 Variable not found
+*/
+int get_user_var_int(const char *name,
+                     long long int *value, int *null_value);
+
+/**
+   Get the value of user variable as a double precision float number.
+
+   This function will return the value of variable @a name as real
+   number. If the original value of the variable is not a real number,
+   the value will be converted into a real number.
+
+   @param name     user variable name
+   @param value    pointer to return the value
+   @param null_value if not NULL, the function will set it to true if
+   the value of variable is null, set to false if not
+
+   @retval 0 Success
+   @retval 1 Variable not found
+*/
+int get_user_var_real(const char *name,
+                      double *value, int *null_value);
+
+/**
+   Get the value of user variable as a string.
+
+   This function will return the value of variable @a name as
+   string. If the original value of the variable is not a string,
+   the value will be converted into a string.
+
+   @param name     user variable name
+   @param value    pointer to the value buffer
+   @param len      length of the value buffer
+   @param precision precision of the value if it is a float number
+   @param null_value if not NULL, the function will set it to true if
+   the value of variable is null, set to false if not
+
+   @retval 0 Success
+   @retval 1 Variable not found
+*/
+int get_user_var_str(const char *name,
+                     char *value, unsigned long len,
+                     unsigned int precision, int *null_value);
+
+  
 #ifdef __cplusplus
 }
 #endif

=== modified file 'include/mysql/plugin.h.pp'
--- a/include/mysql/plugin.h.pp	2008-09-12 08:58:52 +0000
+++ b/include/mysql/plugin.h.pp	2008-10-07 17:04:28 +0000
@@ -1,3 +1,5 @@
+#include <stdlib.h>
+typedef struct st_mysql MYSQL;
 struct st_mysql_lex_string
 {
   char *str;
@@ -106,6 +108,9 @@ struct st_mysql_storage_engine
   int interface_version;
 };
 struct handlerton;
+struct Mysql_replication {
+  int interface_version;
+};
 struct st_mysql_value
 {
   int (*value_type)(struct st_mysql_value *);
@@ -139,3 +144,10 @@ void thd_get_xid(const void* thd, MYSQL_
 void mysql_query_cache_invalidate4(void* thd,
                                    const char *key, unsigned int key_length,
                                    int using_trx);
+int get_user_var_int(const char *name,
+                     long long int *value, int *null_value);
+int get_user_var_real(const char *name,
+                      double *value, int *null_value);
+int get_user_var_str(const char *name,
+                     char *value, unsigned long len,
+                     unsigned int precision, int *null_value);

=== modified file 'libmysql/CMakeLists.txt'
--- a/libmysql/CMakeLists.txt	2008-09-09 19:02:38 +0000
+++ b/libmysql/CMakeLists.txt	2008-10-07 17:04:28 +0000
@@ -119,7 +119,7 @@ ADD_LIBRARY(mysqlclient_notls STATIC ${C
 ADD_DEPENDENCIES(mysqlclient_notls GenError)
 TARGET_LINK_LIBRARIES(mysqlclient_notls)
 
-ADD_LIBRARY(libmysql          SHARED ${CLIENT_SOURCES} dll.c libmysql.def)
+ADD_LIBRARY(libmysql          SHARED ${CLIENT_SOURCES} libmysql.def)
 IF(WIN32)
   SET_TARGET_PROPERTIES(libmysql mysqlclient PROPERTIES COMPILE_FLAGS "-DUSE_TLS")
 ENDIF(WIN32)

=== modified file 'libmysql/Makefile.am'
--- a/libmysql/Makefile.am	2007-11-22 11:39:07 +0000
+++ b/libmysql/Makefile.am	2008-09-29 17:47:27 +0000
@@ -31,7 +31,7 @@ include $(srcdir)/Makefile.shared
 libmysqlclient_la_SOURCES = $(target_sources)
 libmysqlclient_la_LIBADD = $(target_libadd) $(yassl_las)
 libmysqlclient_la_LDFLAGS = $(target_ldflags)
-EXTRA_DIST = Makefile.shared libmysql.def dll.c CMakeLists.txt
+EXTRA_DIST = Makefile.shared libmysql.def CMakeLists.txt
 noinst_HEADERS = client_settings.h
 
 link_sources:

=== removed file 'libmysql/dll.c'
--- a/libmysql/dll.c	2008-09-01 22:30:06 +0000
+++ b/libmysql/dll.c	1970-01-01 00:00:00 +0000
@@ -1,125 +0,0 @@
-/* Copyright (C) 2000-2004 MySQL AB
-
-   This program is free software; you can redistribute it and/or modify
-   it under the terms of the GNU General Public License as published by
-   the Free Software Foundation.
-
-   There are special exceptions to the terms and conditions of the GPL as it
-   is applied to this software. View the full text of the exception in file
-   EXCEPTIONS-CLIENT in the directory of this software distribution.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-   GNU General Public License for more details.
-
-   You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
-
-/*
-** Handling initialization of the dll library
-*/
-
-#include <my_global.h>
-#include <my_sys.h>
-#include <my_pthread.h>
-
-static my_bool libmysql_inited=0;
-
-void libmysql_init(void)
-{
-  if (libmysql_inited)
-    return;
-  libmysql_inited=1;
-  my_init();
-  {
-    DBUG_ENTER("libmysql_init");
-#ifdef LOG_ALL
-    DBUG_PUSH("d:t:S:O,c::\\tmp\\libmysql.log");
-#else
-    if (getenv("LIBMYSQL_LOG") != NULL)
-      DBUG_PUSH(getenv("LIBMYSQL_LOG"));
-#endif
-    DBUG_VOID_RETURN;
-  }
-}
-
-#ifdef __WIN__
-
-static int inited=0,threads=0;
-HINSTANCE NEAR s_hModule;	/* Saved module handle */
-DWORD main_thread;
-
-BOOL APIENTRY LibMain(HANDLE hInst,DWORD ul_reason_being_called,
-		      LPVOID lpReserved)
-{
-  switch (ul_reason_being_called) {
-  case DLL_PROCESS_ATTACH:	/* case of libentry call in win 3.x */
-    if (!inited++)
-    {
-      s_hModule=hInst;
-      libmysql_init();
-      main_thread=GetCurrentThreadId();
-    }
-    break;
-  case DLL_THREAD_ATTACH:
-    threads++;
-    my_thread_init();
-    break;
-  case DLL_PROCESS_DETACH:	/* case of wep call in win 3.x */
-     if (!--inited)		/* Safety */
-     {
-       /* my_thread_init() */	/* This may give extra safety */
-       my_end(0);
-     }
-    break;
-  case DLL_THREAD_DETACH:
-    /* Main thread will free by my_end() */
-    threads--;
-    if (main_thread != GetCurrentThreadId())
-      my_thread_end();
-    break;
-  default:
-    break;
-  } /* switch */
-
-  return TRUE;
-
-  UNREFERENCED_PARAMETER(lpReserved);
-} /* LibMain */
-
-
-static BOOL do_libmain;
-int __stdcall DllMain(HANDLE hInst,DWORD ul_reason_being_called,LPVOID lpReserved)
-{
-  /*
-    Unless environment variable LIBMYSQL_DLLINIT is set, do nothing.
-    The environment variable is checked once, during the first call to DllMain()
-    (in DLL_PROCESS_ATTACH hook).
-  */
-  if (ul_reason_being_called == DLL_PROCESS_ATTACH)
-    do_libmain = (getenv("LIBMYSQL_DLLINIT") != NULL);
-  if (do_libmain)
-    return LibMain(hInst,ul_reason_being_called,lpReserved);
-  return TRUE;
-}
-
-#elif defined(WINDOWS)
-
-/****************************************************************************
-**	This routine is called by LIBSTART.ASM at module load time.  All it
-**	does in this sample is remember the DLL module handle.	The module
-**	handle is needed if you want to do things like load stuff from the
-**	resource file (for instance string resources).
-****************************************************************************/
-
-int _export FAR PASCAL libmain(HANDLE hModule,short cbHeapSize,
-			       UCHAR FAR *lszCmdLine)
-{
-  s_hModule = hModule;
-  libmysql_init();
-  return TRUE;
-}
-
-#endif

=== modified file 'libmysqld/CMakeLists.txt'
--- a/libmysqld/CMakeLists.txt	2008-08-08 01:33:43 +0000
+++ b/libmysqld/CMakeLists.txt	2008-09-23 14:33:18 +0000
@@ -204,6 +204,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libm
            ../sql/ddl_blocker.cc ../sql/si_objects.cc
            ../sql/event_parse_data.cc ../sql/mdl.cc
            ../sql/transaction.cc
+	   ../sql/rpl_handler.cc
            ${GEN_SOURCES}
            ${LIB_SOURCES})
 

=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am	2008-08-08 01:33:43 +0000
+++ b/libmysqld/Makefile.am	2008-09-23 14:33:18 +0000
@@ -81,7 +81,8 @@ sqlsources = derror.cc field.cc field_co
 	debug_sync.cc sql_tablespace.cc transaction.cc \
 	rpl_injector.cc my_user.c partition_info.cc \
 	sql_servers.cc ddl_blocker.cc si_objects.cc sql_audit.cc \
-        event_parse_data.cc mdl.cc
+        event_parse_data.cc mdl.cc \
+        rpl_handler.cc
 
 libmysqld_int_a_SOURCES= $(libmysqld_sources)
 nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources)

=== renamed file 'mysql-test/suite/falcon_team/r/falcon_bug_23945.result' => 'mysql-test/suite/falcon/r/falcon_bug_23945.result'
--- a/mysql-test/suite/falcon_team/r/falcon_bug_23945.result	2008-09-09 08:30:18 +0000
+++ b/mysql-test/suite/falcon/r/falcon_bug_23945.result	2008-10-02 10:37:27 +0000
@@ -15,3 +15,5 @@ SELECT * FROM t1;
 ERROR 42S02: Table 'test.t1' doesn't exist
 DROP TABLE t1;
 ERROR 42S02: Unknown table 't1'
+CREATE TABLE t1(a INT);
+DROP TABLE t1;

=== modified file 'mysql-test/suite/falcon/r/falcon_bug_34890.result'
--- a/mysql-test/suite/falcon/r/falcon_bug_34890.result	2008-03-27 16:50:16 +0000
+++ b/mysql-test/suite/falcon/r/falcon_bug_34890.result	2008-10-06 19:17:21 +0000
@@ -16,7 +16,7 @@ CREATE TABLE t1 (
 t1_autoinc INTEGER NOT NULL AUTO_INCREMENT,
 t1_uuid CHAR(36),
 PRIMARY KEY (t1_autoinc)
-) ENGINE = Falcon;
+);
 CREATE PROCEDURE p1 ()
 begin
 DECLARE my_count INT DEFAULT 0;

=== renamed file 'mysql-test/suite/falcon_team/r/falcon_bug_34892.result' => 'mysql-test/suite/falcon/r/falcon_bug_34892.result'
=== modified file 'mysql-test/suite/falcon/r/falcon_online_index.result'
--- a/mysql-test/suite/falcon/r/falcon_online_index.result	2008-09-10 15:08:56 +0000
+++ b/mysql-test/suite/falcon/r/falcon_online_index.result	2008-10-03 05:15:40 +0000
@@ -72,14 +72,14 @@ affected rows: 0
 info: Records: 0  Duplicates: 0  Warnings: 0
 #-------- Testing implicit OFFLINE --------#
 ALTER TABLE t3 ADD INDEX ix_c (c);
-affected rows: 1000
-info: Records: 1000  Duplicates: 0  Warnings: 0
+affected rows: 0
+info: Records: 0  Duplicates: 0  Warnings: 0
 DROP INDEX ix_c ON t3;
 affected rows: 0
 info: Records: 0  Duplicates: 0  Warnings: 0
 ALTER TABLE t3 ADD INDEX ix_cd (c, d);
-affected rows: 1000
-info: Records: 1000  Duplicates: 0  Warnings: 0
+affected rows: 0
+info: Records: 0  Duplicates: 0  Warnings: 0
 DROP INDEX ix_cd ON t3;
 affected rows: 0
 info: Records: 0  Duplicates: 0  Warnings: 0
@@ -108,10 +108,8 @@ a	b	c	d	e
 ALTER ONLINE TABLE t3 DROP INDEX ix_b;
 #-------- ONLINE: ALTER ADD not-null with default --------#
 ALTER ONLINE TABLE t3 ADD INDEX ix_c (c);
-ERROR 42000: This version of MySQL doesn't yet support 'ALTER ONLINE TABLE t3 ADD INDEX ix_c (c)'
 #-------- ONLINE: ALTER ADD not-null --------#
 ALTER ONLINE TABLE t3 ADD INDEX ix_d (d);
-ERROR 42000: This version of MySQL doesn't yet support 'ALTER ONLINE TABLE t3 ADD INDEX ix_d (d)'
 #-------- ONLINE: ALTER ADD same key multiple times --------#
 ALTER ONLINE TABLE t1 ADD INDEX index_c (c);
 ALTER ONLINE TABLE t1 ADD INDEX index_c (c);
@@ -171,6 +169,8 @@ ALTER ONLINE TABLE t3 ADD INDEX ix_asc_b
 SHOW INDEXES FROM t3;
 Table	Non_unique	Key_name	Seq_in_index	Column_name	Collation	Cardinality	Sub_part	Packed	Null	Index_type	Comment	Index_Comment
 t3	0	PRIMARY	1	a	NULL	500	NULL	NULL		BTREE		
+t3	1	ix_c	1	c	NULL	500	NULL	NULL		BTREE		
+t3	1	ix_d	1	d	NULL	500	NULL	NULL		BTREE		
 t3	1	ix_desc_b	1	b	NULL	500	NULL	NULL	YES	BTREE		
 t3	1	ix_asc_b	1	b	NULL	500	NULL	NULL	YES	BTREE		
 DROP ONLINE INDEX ix_desc_b ON t3;
@@ -265,6 +265,8 @@ t1	1	index_int	1	c	NULL	10	NULL	NULL	YES
 SHOW INDEXES FROM t3;
 Table	Non_unique	Key_name	Seq_in_index	Column_name	Collation	Cardinality	Sub_part	Packed	Null	Index_type	Comment	Index_Comment
 t3	0	PRIMARY	1	a	NULL	500	NULL	NULL		BTREE		
+t3	1	ix_c	1	c	NULL	500	NULL	NULL		BTREE		
+t3	1	ix_d	1	d	NULL	500	NULL	NULL		BTREE		
 t3	1	index_int	1	b	NULL	500	NULL	NULL	YES	BTREE		
 t3	1	index_multi	1	b	NULL	250	NULL	NULL	YES	BTREE		
 t3	1	index_multi	2	e	NULL	500	NULL	NULL	YES	BTREE		
@@ -283,6 +285,8 @@ t1	0	PRIMARY	1	a	NULL	10	NULL	NULL		BTRE
 SHOW INDEXES FROM t3;
 Table	Non_unique	Key_name	Seq_in_index	Column_name	Collation	Cardinality	Sub_part	Packed	Null	Index_type	Comment	Index_Comment
 t3	0	PRIMARY	1	a	NULL	500	NULL	NULL		BTREE		
+t3	1	ix_c	1	c	NULL	500	NULL	NULL		BTREE		
+t3	1	ix_d	1	d	NULL	500	NULL	NULL		BTREE		
 #-------- Test: Combined ADD/DROP INDEX in a single statement --------#
 ALTER TABLE t1 ADD INDEX index_int (c);
 ALTER TABLE t1 ADD INDEX index_char (d), DROP INDEX index_int;
@@ -344,7 +348,7 @@ a	b	c	d	a	b	c	d
 16	TestRow16	32	Char16	31	62	32	SomeString 31 for testing
 EXPLAIN SELECT * FROM t1, t3 WHERE t3.b=2 AND (t1.c = t3.c OR t1.a=t3.d);
 id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t3	ref	ix_b	ix_b	5	const	100	
+1	SIMPLE	t3	ref	ix_c,ix_d,ix_b	ix_b	5	const	100	
 1	SIMPLE	t1	ALL	PRIMARY,ix_a,ix_c	NULL	NULL	NULL	20	Range checked for each record (index map: 0xB)
 SELECT * FROM t1, t3 WHERE t3.b=2 AND (t1.c = t3.c OR t1.a=t3.d);
 a	b	c	d	a	b	c	d	e
@@ -367,6 +371,8 @@ t2	0	PRIMARY	1	a	NULL	16	NULL	NULL		BTRE
 SHOW INDEXES FROM t3;
 Table	Non_unique	Key_name	Seq_in_index	Column_name	Collation	Cardinality	Sub_part	Packed	Null	Index_type	Comment	Index_Comment
 t3	0	PRIMARY	1	a	NULL	500	NULL	NULL		BTREE		
+t3	1	ix_c	1	c	NULL	500	NULL	NULL		BTREE		
+t3	1	ix_d	1	d	NULL	500	NULL	NULL		BTREE		
 DROP TABLE t1;
 DROP TABLE t2;
 DROP TABLE t3;

=== renamed file 'mysql-test/suite/falcon_team/t/falcon_bug_23945.test' => 'mysql-test/suite/falcon/t/falcon_bug_23945.test'
--- a/mysql-test/suite/falcon_team/t/falcon_bug_23945.test	2008-09-09 08:30:18 +0000
+++ b/mysql-test/suite/falcon/t/falcon_bug_23945.test	2008-10-02 10:37:27 +0000
@@ -36,6 +36,9 @@ SELECT * FROM t1;
 --error ER_BAD_TABLE_ERROR
 DROP TABLE t1;
 
+CREATE TABLE t1(a INT);
+DROP TABLE t1;
+
 # ----------------------------------------------------- #
 # --- Check                                         --- #
 # ----------------------------------------------------- #

=== modified file 'mysql-test/suite/falcon/t/falcon_bug_34890.test'
--- a/mysql-test/suite/falcon/t/falcon_bug_34890.test	2008-03-27 16:50:16 +0000
+++ b/mysql-test/suite/falcon/t/falcon_bug_34890.test	2008-10-06 19:17:21 +0000
@@ -1,9 +1,10 @@
 --source include/have_falcon.inc
+
 #
 # Bug #34890: Update Conflict on non-overlapping transactions
-# This test works because it uses FALCON_CONSISTENT_READ=OFF
-# This test is different from 34351_C in that there is no index
-# on t1_uuid.
+#   This test works because it uses FALCON_CONSISTENT_READ=OFF
+#   This test is different from 34351_C in that there is no index
+#   on t1_uuid.
 #
 --echo *** Bug #34890 ***
 
@@ -42,9 +43,7 @@ CREATE TABLE t1 (
   t1_autoinc INTEGER NOT NULL AUTO_INCREMENT,
   t1_uuid CHAR(36),
   PRIMARY KEY (t1_autoinc)
-) ENGINE = Falcon;
-
-#   declare continue handler for sqlexception
+);
 
 delimiter //;
 CREATE PROCEDURE p1 ()
@@ -64,7 +63,6 @@ delimiter ;//
 # ----------------------------------------------------- #
 # --- Test                                          --- #
 # ----------------------------------------------------- #
-
 --echo # Switch to connection conn1
 connection conn1;
 --echo # Send call p1() to the server but do not pull the results
@@ -110,11 +108,9 @@ connection conn4;
 --echo # Pull the results of the preceeding call p1() by conn4
 --reap
 
-
 --echo # Switch to connection default
 connection default;
 
-
 # ----------------------------------------------------- #
 # --- Check                                         --- #
 # ----------------------------------------------------- #

=== renamed file 'mysql-test/suite/falcon_team/t/falcon_bug_34892.test' => 'mysql-test/suite/falcon/t/falcon_bug_34892.test'
--- a/mysql-test/suite/falcon_team/t/falcon_bug_34892.test	2008-04-20 00:05:17 +0000
+++ b/mysql-test/suite/falcon/t/falcon_bug_34892.test	2008-10-06 19:17:21 +0000
@@ -1,6 +1,7 @@
 --source include/have_falcon.inc
+
 #
-# Bug #34892: Some falcon tests fail sporadically
+# Bug #34892: Transaction handling in select_create::abort let's Falcon fail
 #
 --echo *** Bug #34892 ***
 
@@ -21,8 +22,6 @@ SET @@autocommit = 0;
 --error ER_DUP_ENTRY
 CREATE TABLE t1 (PRIMARY KEY (a)) SELECT 1 AS a UNION ALL SELECT 1;
 
-# This CREATE TABLE provokes a
-# 1005: Can't create table 'test.t1' (errno: 156).
 CREATE TABLE t1 (a int);
 
 # ----------------------------------------------------- #

=== modified file 'mysql-test/suite/falcon/t/falcon_online_index.test'
--- a/mysql-test/suite/falcon/t/falcon_online_index.test	2008-09-10 15:08:56 +0000
+++ b/mysql-test/suite/falcon/t/falcon_online_index.test	2008-10-03 05:15:40 +0000
@@ -163,14 +163,12 @@ ALTER ONLINE TABLE t3 DROP INDEX ix_b;
 
 --echo #-------- ONLINE: ALTER ADD not-null with default --------#
 
-# Test that ALTER ONLINE ... ADD INDEX fails with ONLINE for non-nullable column having default value
---error ER_NOT_SUPPORTED_YET
+# Test that ALTER ONLINE ... ADD INDEX succeeds with ONLINE for non-nullable column having default value
 ALTER ONLINE TABLE t3 ADD INDEX ix_c (c);
 
 --echo #-------- ONLINE: ALTER ADD not-null --------#
 
-# Test that ALTER ONLINE ... ADD INDEX fails with ONLINE for non-nullable columns
---error ER_NOT_SUPPORTED_YET
+# Test that ALTER ONLINE ... ADD INDEX succeeds with ONLINE for non-nullable columns
 ALTER ONLINE TABLE t3 ADD INDEX ix_d (d);
 
 --echo #-------- ONLINE: ALTER ADD same key multiple times --------#

=== modified file 'mysql-test/suite/falcon_team/t/disabled.def'
--- a/mysql-test/suite/falcon_team/t/disabled.def	2008-09-10 22:35:51 +0000
+++ b/mysql-test/suite/falcon_team/t/disabled.def	2008-10-02 10:37:27 +0000
@@ -17,4 +17,3 @@
 #    which should probably be attached to a bug report instead.
 #    Also please keep the list sorted.
 
-falcon_bug_23945 : Bug#34892 2008-09-10 hky Test failure brings Falcon's data dictionary out of sync

=== modified file 'mysql-test/suite/falcon_team/t/test2bug.def'
--- a/mysql-test/suite/falcon_team/t/test2bug.def	2008-09-28 07:16:36 +0000
+++ b/mysql-test/suite/falcon_team/t/test2bug.def	2008-10-02 10:37:27 +0000
@@ -20,5 +20,4 @@
 falcon_bug_23945: Bug#34892 - Transaction handling in select_create::abort let's Falcon fail
 falcon_bug_26433: Bug#39314 - falcon_bug_26433 fails with an offset of 1 in row numbers in expected warnings
 falcon_bug_28048: Bug#36700 - Running falcon_bug_28048 shows increasing memory usage and run time
-falcon_bug_34892: Bug#34892 - Transaction handling in select_create::abort let's Falcon fail
 falcon_deadlock:  Bug#34182 - SELECT ... FOR UPDATE does not lock when in subquery

=== modified file 'sql/CMakeLists.txt'
--- a/sql/CMakeLists.txt	2008-09-16 08:34:30 +0000
+++ b/sql/CMakeLists.txt	2008-09-26 16:30:56 +0000
@@ -78,6 +78,7 @@ ADD_EXECUTABLE(mysqld
                sql_connect.cc scheduler.cc transaction.cc
                ddl_blocker.cc si_objects.cc si_logs.cc
                sql_profile.cc event_parse_data.cc mdl.cc
+               rpl_handler.cc
                ${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc
                ${PROJECT_SOURCE_DIR}/sql/sql_yacc.h
                ${PROJECT_SOURCE_DIR}/include/mysqld_error.h

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2008-09-30 05:24:12 +0000
+++ b/sql/Makefile.am	2008-10-01 12:09:26 +0000
@@ -89,7 +89,8 @@ noinst_HEADERS =	item.h item_func.h item
 			sql_partition.h partition_info.h partition_element.h \
 			probes.h sql_audit.h transaction.h \
 			contributors.h sql_servers.h ddl_blocker.h \
-			si_objects.h si_logs.h sql_plist.h mdl.h records.h
+			si_objects.h si_logs.h sql_plist.h mdl.h records.h \
+			rpl_handler.h replication.h
 
 mysqld_SOURCES =	sql_lex.cc sql_handler.cc sql_partition.cc \
 			item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -136,7 +137,8 @@ mysqld_SOURCES =	sql_lex.cc sql_handler.
 			sql_builtin.cc sql_tablespace.cc partition_info.cc \
 			sql_servers.cc sql_audit.cc sha2.cc \
 			ddl_blocker.cc si_objects.cc si_logs.cc \
-			event_parse_data.cc mdl.cc transaction.cc
+			event_parse_data.cc mdl.cc transaction.cc \
+			rpl_handler.cc
 
 if HAVE_DTRACE
   mysqld_SOURCES += probes.d

=== modified file 'sql/log.cc'
--- a/sql/log.cc	2008-10-06 17:04:15 +0000
+++ b/sql/log.cc	2008-10-07 17:04:28 +0000
@@ -39,6 +39,7 @@
 #endif
 
 #include <mysql/plugin.h>
+#include "rpl_handler.h"
 #include "si_logs.h"
 
 /* max size of the log message */
@@ -4895,9 +4896,11 @@ err:
 }
 
 
-bool MYSQL_BIN_LOG::flush_and_sync()
+bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
 {
   int err=0, fd=log_file.file;
+  if (synced)
+    *synced= 0;
   safe_mutex_assert_owner(&LOCK_log);
   if (flush_io_cache(&log_file))
     return 1;
@@ -4905,6 +4908,8 @@ bool MYSQL_BIN_LOG::flush_and_sync()
   {
     sync_binlog_counter= 0;
     err=my_sync(fd, MYF(MY_WME));
+    if (synced)
+      *synced= 1;
   }
   return err;
 }
@@ -5194,7 +5199,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_row
 
     if (file == &log_file)
     {
-      error= flush_and_sync();
+      error= flush_and_sync(0);
       if (!error)
       {
         signal_update();
@@ -5377,8 +5382,15 @@ bool MYSQL_BIN_LOG::write(Log_event *eve
 
     if (file == &log_file) // we are writing to the real log (disk)
     {
-      if (flush_and_sync())
+      bool synced;
+      if (flush_and_sync(&synced))
 	goto err;
+
+      if (RUN_HOOK(binlog_storage, after_flush,
+                   (thd, log_file_name, file->pos_in_file, synced))) {
+        goto err;
+      }
+
       signal_update();
       rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
     }
@@ -5639,7 +5651,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE 
   DBUG_ASSERT(carry == 0);
 
   if (sync_log)
-    flush_and_sync();
+    flush_and_sync(0);
 
   return 0;                                     // All OK
 }
@@ -5725,7 +5737,9 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
 
       if (commit_event && commit_event->write(&log_file))
         goto err;
-      if (flush_and_sync())
+      
+      bool synced;
+      if (flush_and_sync(&synced))
         goto err;
       DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_ABORT(););
       if (cache->error)				// Error on read
@@ -5734,6 +5748,11 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
         write_error=1;				// Don't give more errors
         goto err;
       }
+
+      if (RUN_HOOK(binlog_storage, after_flush,
+                   (thd, log_file_name, log_file.pos_in_file, synced)))
+        goto err;
+
       signal_update();
     }
 

=== modified file 'sql/log.h'
--- a/sql/log.h	2008-10-01 12:02:28 +0000
+++ b/sql/log.h	2008-10-07 17:04:28 +0000
@@ -459,7 +459,21 @@ public:
   bool is_active(const char* log_file_name);
   int update_log_index(LOG_INFO* linfo, bool need_update_threads);
   void rotate_and_purge(uint flags);
-  bool flush_and_sync();
+
+  /**
+     Flush binlog cache and synchronize to disk.
+
+     This function flushes events in binlog cache to binary log file,
+     it will do synchronizing according to the setting of system
+     variable 'sync_binlog'. If file is synchronized, @c synced will
+     be set to 1, otherwise 0.
+
+     @param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0
+
+     @retval 0 Success
+     @retval other Failure
+  */
+  bool flush_and_sync(bool *synced);
   int purge_logs(const char *to_log, bool included,
                  bool need_mutex, bool need_update_threads,
                  ulonglong *decrease_log_space);

=== modified file 'sql/mysql_priv.h'
--- a/sql/mysql_priv.h	2008-10-02 14:03:57 +0000
+++ b/sql/mysql_priv.h	2008-10-07 17:04:28 +0000
@@ -140,7 +140,7 @@ char* query_table_status(THD *thd,const 
 #define WARN_DEPRECATED(Thd,VerHi,VerLo,Old,New)                            \
   do {                                                                      \
     compile_time_assert(MYSQL_VERSION_ID < VerHi * 10000 + VerLo * 100);    \
-    if (Thd)                                                                \
+    if ((Thd) != NULL)                                                      \
       push_warning_printf((Thd), MYSQL_ERROR::WARN_LEVEL_WARN,              \
                         ER_WARN_DEPRECATED_SYNTAX,                          \
                         ER(ER_WARN_DEPRECATED_SYNTAX_WITH_VER),             \

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2008-10-07 13:38:49 +0000
+++ b/sql/mysqld.cc	2008-10-07 17:04:28 +0000
@@ -33,6 +33,8 @@
 
 #include "rpl_injector.h"
 
+#include "rpl_handler.h"
+
 #ifdef HAVE_SYS_PRCTL_H
 #include <sys/prctl.h>
 #endif
@@ -1352,6 +1354,7 @@ void clean_up(bool print_message)
   ha_end();
   if (tc_log)
     tc_log->close();
+  delegates_destroy();
   xid_cache_free();
   delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
   multi_keycache_free();
@@ -3946,6 +3949,13 @@ static int init_server_components()
     unireg_abort(1);
   }
 
+  /* initialize delegates for extension observers */
+  if (delegates_init())
+  {
+    sql_print_error("Initialize extension delegates failed");
+    unireg_abort(1);
+  }
+
   /* need to configure logging before initializing storage engines */
   if (opt_update_log)
   {

=== added file 'sql/replication.h'
--- a/sql/replication.h	1970-01-01 00:00:00 +0000
+++ b/sql/replication.h	2008-09-23 15:06:18 +0000
@@ -0,0 +1,461 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef REPLICATION_H
+#define REPLICATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+   Transaction observer flags.
+*/
+enum Trans_flags {
+  /** Transaction is a real transaction */
+  TRANS_IS_REAL_TRANS = 1
+};
+
+/**
+   Transaction observer parameter
+*/
+typedef struct Trans_param {
+  uint32 server_id;
+  uint32 flags;
+
+  /*
+    The latest binary log file name and position written by current
+    transaction, if binary log is disabled or no log event has been
+    written into binary log file by current transaction (events
+    written into transaction log cache are not counted), these two
+    member will be zero.
+  */
+  const char *log_file;
+  my_off_t log_pos;
+} Trans_param;
+
+/**
+   Observes and extends transaction execution
+*/
+typedef struct Trans_observer {
+  uint32 len;
+
+  /**
+     This callback is called after transaction commit
+     
+     This callback is called right after commit to storage engines for
+     transactional tables.
+
+     For non-transactional tables, this is called at the end of the
+     statement, before sending statement status, if the statement
+     succeeded.
+
+     @note The return value is currently ignored by the server.
+
+     @param param The parameter for transaction observers
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_commit)(Trans_param *param);
+
+  /**
+     This callback is called after transaction rollback
+
+     This callback is called right after rollback to storage engines
+     for transactional tables.
+
+     For non-transactional tables, this is called at the end of the
+     statement, before sending statement status, if the statement
+     failed.
+
+     @note The return value is currently ignored by the server.
+
+     @param param The parameter for transaction observers
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_rollback)(Trans_param *param);
+} Trans_observer;
+
+/**
+   Binlog storage flags
+*/
+enum Binlog_storage_flags {
+  /** Binary log was sync:ed */
+  BINLOG_STORAGE_IS_SYNCED = 1
+};
+
+/**
+   Binlog storage observer parameters
+ */
+typedef struct Binlog_storage_param {
+  uint32 server_id;
+} Binlog_storage_param;
+
+/**
+   Observe binlog logging storage
+*/
+typedef struct Binlog_storage_observer {
+  uint32 len;
+
+  /**
+     This callback is called after binlog has been flushed
+
+     This callback is called after cached events have been flushed to
+     binary log file. Whether the binary log file is synchronized to
+     disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags.
+
+     @param param Observer common parameter
+     @param log_file Binlog file name been updated
+     @param log_pos Binlog position after update
+     @param flags flags for binlog storage
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_flush)(Binlog_storage_param *param,
+                     const char *log_file, my_off_t log_pos,
+                     uint32 flags);
+} Binlog_storage_observer;
+
+/**
+   Replication binlog transmitter (binlog dump) observer parameter.
+*/
+typedef struct Binlog_transmit_param {
+  uint32 server_id;
+  uint32 flags;
+} Binlog_transmit_param;
+
+/**
+   Observe and extends the binlog dumping thread.
+*/
+typedef struct Binlog_transmit_observer {
+  uint32 len;
+  
+  /**
+     This callback is called when binlog dumping starts
+
+
+     @param param Observer common parameter
+     @param log_file Binlog file name to transmit from
+     @param log_pos Binlog position to transmit from
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*transmit_start)(Binlog_transmit_param *param,
+                        const char *log_file, my_off_t log_pos);
+
+  /**
+     This callback is called when binlog dumping stops
+
+     @param param Observer common parameter
+     
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*transmit_stop)(Binlog_transmit_param *param);
+
+  /**
+     This callback is called to reserve bytes in packet header for event transmission
+
+     This callback is called when resetting transmit packet header to
+     reserve bytes for this observer in packet header.
+
+     The @a header buffer is allocated by the server code, and @a size
+     is the size of the header buffer. Each observer can only reserve
+     a maximum size of @a size in the header.
+
+     @param param Observer common parameter
+     @param header Pointer of the header buffer
+     @param size Size of the header buffer
+     @param len Header length reserved by this observer
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*reserve_header)(Binlog_transmit_param *param,
+                        unsigned char *header,
+                        unsigned long size,
+                        unsigned long *len);
+
+  /**
+     This callback is called before sending an event packet to slave
+
+     @param param Observer common parameter
+     @param packet Binlog event packet to send
+     @param len Length of the event packet
+     @param log_file Binlog file name of the event packet to send
+     @param log_pos Binlog position of the event packet to send
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*before_send_event)(Binlog_transmit_param *param,
+                           unsigned char *packet, unsigned long len,
+                           const char *log_file, my_off_t log_pos );
+
+  /**
+     This callback is called after sending an event packet to slave
+
+     @param param Observer common parameter
+     @param event_buf Binlog event packet buffer sent
+     @param len length of the event packet buffer
+
+     @retval 0 Sucess
+     @retval 1 Failure
+   */
+  int (*after_send_event)(Binlog_transmit_param *param,
+                          const char *event_buf, unsigned long len);
+
+  /**
+     This callback is called after resetting master status
+
+     This is called when executing the command RESET MASTER, and is
+     used to reset status variables added by observers.
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_reset_master)(Binlog_transmit_param *param);
+} Binlog_transmit_observer;
+
+/**
+   Binlog relay IO flags
+*/
+enum Binlog_relay_IO_flags {
+  /** Binary relay log was sync:ed */
+  BINLOG_RELAY_IS_SYNCED = 1
+};
+
+
+/**
+  Replication binlog relay IO observer parameter
+*/
+typedef struct Binlog_relay_IO_param {
+  uint32 server_id;
+
+  /* Master host, user and port */
+  char *host;
+  char *user;
+  unsigned int port;
+
+  char *master_log_name;
+  my_off_t master_log_pos;
+
+  MYSQL *mysql;                        /* the connection to master */
+} Binlog_relay_IO_param;
+
+/**
+   Observes and extends the service of slave IO thread.
+*/
+typedef struct Binlog_relay_IO_observer {
+  uint32 len;
+
+  /**
+     This callback is called when slave IO thread starts
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*thread_start)(Binlog_relay_IO_param *param);
+
+  /**
+     This callback is called when slave IO thread stops
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*thread_stop)(Binlog_relay_IO_param *param);
+
+  /**
+     This callback is called before slave requesting binlog transmission from master
+
+     This is called before slave issuing BINLOG_DUMP command to master
+     to request binlog.
+
+     @param param Observer common parameter
+     @param flags binlog dump flags
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);
+
+  /**
+     This callback is called after read an event packet from master
+
+     @param param Observer common parameter
+     @param packet The event packet read from master
+     @param len Length of the event packet read from master
+     @param event_buf The event packet return after process
+     @param event_len The length of event packet return after process
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_read_event)(Binlog_relay_IO_param *param,
+                          const char *packet, unsigned long len,
+                          const char **event_buf, unsigned long *event_len);
+
+  /**
+     This callback is called after written an event packet to relay log
+
+     @param param Observer common parameter
+     @param event_buf Event packet written to relay log
+     @param event_len Length of the event packet written to relay log
+     @param flags flags for relay log
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_queue_event)(Binlog_relay_IO_param *param,
+                           const char *event_buf, unsigned long event_len,
+                           uint32 flags);
+
+  /**
+     This callback is called after reset slave relay log IO status
+     
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_reset_slave)(Binlog_relay_IO_param *param);
+} Binlog_relay_IO_observer;
+
+
+/**
+   Register a transaction observer
+
+   @param observer The transaction observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_trans_observer(Trans_observer *observer, void *p);
+
+/**
+   Unregister a transaction observer
+
+   @param observer The transaction observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_trans_observer(Trans_observer *observer, void *p);
+
+/**
+   Register a binlog storage observer
+
+   @param observer The binlog storage observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+   Unregister a binlog storage observer
+
+   @param observer The binlog storage observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+   Register a binlog transmit observer
+
+   @param observer The binlog transmit observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+   Unregister a binlog transmit observer
+
+   @param observer The binlog transmit observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+   Register a binlog relay IO (slave IO thread) observer
+
+   @param observer The binlog relay IO observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+   Unregister a binlog relay IO (slave IO thread) observer
+
+   @param observer The binlog relay IO observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+   Connect to master
+
+   This function can only used in the slave I/O thread context, and
+   will use the same master information to do the connection.
+
+   @code
+   MYSQL *mysql = mysql_init(NULL);
+   if (rpl_connect_master(mysql))
+   {
+     // do stuff with the connection
+   }
+   mysql_close(mysql); // close the connection
+   @endcode
+   
+   @param mysql address of MYSQL structure to use, pass NULL will
+   create a new one
+
+   @return address of MYSQL structure on success, NULL on failure
+*/
+MYSQL *rpl_connect_master(MYSQL *mysql);
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* REPLICATION_H */

=== added file 'sql/rpl_handler.cc'
--- a/sql/rpl_handler.cc	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.cc	2008-09-23 15:06:18 +0000
@@ -0,0 +1,485 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include "mysql_priv.h"
+
+#include "rpl_mi.h"
+#include "sql_repl.h"
+#include "log_event.h"
+#include "rpl_filter.h"
+#include <my_dir.h>
+#include "rpl_handler.h"
+
+Trans_delegate *transaction_delegate;
+Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+Binlog_transmit_delegate *binlog_transmit_delegate;
+Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+  structure to save transaction log filename and position
+*/
+typedef struct Trans_binlog_info {
+  my_off_t log_pos;
+  char log_file[FN_REFLEN];
+} Trans_binlog_info;
+
+static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+int get_user_var_int(const char *name,
+                     long long int *value, int *null_value)
+{
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  *value= entry->val_int(&null_val);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int get_user_var_real(const char *name,
+                      double *value, int *null_value)
+{
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  *value= entry->val_real(&null_val);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int get_user_var_str(const char *name, char *value,
+                     size_t len, unsigned int precision, int *null_value)
+{
+  String str;
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  entry->val_str(&null_val, &str, precision);
+  strncpy(value, str.c_ptr(), len);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int delegates_init()
+{
+  static unsigned char trans_mem[sizeof(Trans_delegate)];
+  static unsigned char storage_mem[sizeof(Binlog_storage_delegate)];
+#ifdef HAVE_REPLICATION
+  static unsigned char transmit_mem[sizeof(Binlog_transmit_delegate)];
+  static unsigned char relay_io_mem[sizeof(Binlog_relay_IO_delegate)];
+#endif
+  
+  if (!(transaction_delegate= new (trans_mem) Trans_delegate)
+      || (!transaction_delegate->is_inited())
+      || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate)
+      || (!binlog_storage_delegate->is_inited())
+#ifdef HAVE_REPLICATION
+      || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate)
+      || (!binlog_transmit_delegate->is_inited())
+      || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate)
+      || (!binlog_relay_io_delegate->is_inited())
+#endif /* HAVE_REPLICATION */
+      )
+    return 1;
+
+  if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
+    return 1;
+  return 0;
+}
+
+void delegates_destroy()
+{
+  transaction_delegate->~Trans_delegate();
+  binlog_storage_delegate->~Binlog_storage_delegate();
+#ifdef HAVE_REPLICATION
+  binlog_transmit_delegate->~Binlog_transmit_delegate();
+  binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
+#endif /* HAVE_REPLICATION */
+}
+
+/*
+  This macro is used by almost all the Delegate methods to iterate
+  over all the observers running given callback function of the
+  delegate .
+  
+  Add observer plugins to the thd->lex list, after each statement, all
+  plugins add to thd->lex will be automatically unlocked.
+ */
+#define FOREACH_OBSERVER(r, f, thd, args)                               \
+  param.server_id= thd->server_id;                                      \
+  read_lock();                                                          \
+  Observer_info_iterator iter= observer_info_iter();                    \
+  Observer_info *info= iter++;                                          \
+  for (; info; info= iter++)                                            \
+  {                                                                     \
+    plugin_ref plugin=                                                  \
+      my_plugin_lock(thd, &info->plugin);                               \
+    if (!plugin)                                                        \
+    {                                                                   \
+      r= 1;                                                             \
+      break;                                                            \
+    }                                                                   \
+    if (((Observer *)info->observer)->f                                 \
+        && ((Observer *)info->observer)->f args)                        \
+    {                                                                   \
+      r= 1;                                                             \
+      break;                                                            \
+    }                                                                   \
+  }                                                                     \
+  unlock()
+
+
+int Trans_delegate::after_commit(THD *thd, bool all)
+{
+  Trans_param param;
+  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+  if (is_real_trans)
+    param.flags |= TRANS_IS_REAL_TRANS;
+
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+  param.log_file= log_info ? log_info->log_file : 0;
+  param.log_pos= log_info ? log_info->log_pos : 0;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+  /*
+    This is the end of a real transaction or autocommit statement, we
+    can free the memory allocated for binlog file and position.
+  */
+  if (is_real_trans && log_info)
+  {
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+    my_free(log_info, MYF(0));
+  }
+  return ret;
+}
+
+int Trans_delegate::after_rollback(THD *thd, bool all)
+{
+  Trans_param param;
+  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+  if (is_real_trans)
+    param.flags |= TRANS_IS_REAL_TRANS;
+
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+    
+  param.log_file= log_info ? log_info->log_file : 0;
+  param.log_pos= log_info ? log_info->log_pos : 0;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+  /*
+    This is the end of a real transaction or autocommit statement, we
+    can free the memory allocated for binlog file and position.
+  */
+  if (is_real_trans && log_info)
+  {
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+    my_free(log_info, MYF(0));
+  }
+  return ret;
+}
+
+int Binlog_storage_delegate::after_flush(THD *thd,
+                                         const char *log_file,
+                                         my_off_t log_pos,
+                                         bool synced)
+{
+  Binlog_storage_param param;
+  uint32 flags=0;
+  if (synced)
+    flags |= BINLOG_STORAGE_IS_SYNCED;
+
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+    
+  if (!log_info)
+  {
+    if(!(log_info=
+         (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
+      return 1;
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
+  }
+    
+  strcpy(log_info->log_file, log_file+dirname_length(log_file));
+  log_info->log_pos = log_pos;
+  
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_flush, thd,
+                   (&param, log_info->log_file, log_info->log_pos, flags));
+  return ret;
+}
+
+#ifdef HAVE_REPLICATION
+int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
+                                             const char *log_file,
+                                             my_off_t log_pos)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, transmit_start, thd, (&param, log_file, log_pos));
+  return ret;
+}
+
+int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, transmit_stop, thd, (&param));
+  return ret;
+}
+
+int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
+                                             String *packet)
+{
+  /* NOTE2ME: Maximum extra header size for each observer, I hope 32
+     bytes should be enough for each Observer to reserve their extra
+     header. If later found this is not enough, we can increase this
+     /HEZX
+  */
+#define RESERVE_HEADER_SIZE 32
+  unsigned char header[RESERVE_HEADER_SIZE];
+  ulong hlen;
+  Binlog_transmit_param param;
+  param.flags= flags;
+  param.server_id= thd->server_id;
+
+  int ret= 0;
+  read_lock();
+  Observer_info_iterator iter= observer_info_iter();
+  Observer_info *info= iter++;
+  for (; info; info= iter++)
+  {
+    plugin_ref plugin=
+      my_plugin_lock(thd, &info->plugin);
+    if (!plugin)
+    {
+      ret= 1;
+      break;
+    }
+    hlen= 0;
+    if (((Observer *)info->observer)->reserve_header
+        && ((Observer *)info->observer)->reserve_header(&param,
+                                                        header,
+                                                        RESERVE_HEADER_SIZE,
+                                                        &hlen))
+    {
+      ret= 1;
+      break;
+    }
+    if (hlen == 0)
+      continue;
+    if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
+    {
+      ret= 1;
+      break;
+    }
+  }
+  unlock();
+  return ret;
+}
+
+int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
+                                                String *packet,
+                                                const char *log_file,
+                                                my_off_t log_pos)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, before_send_event, thd,
+                   (&param, (uchar *)packet->c_ptr(),
+                    packet->length(),
+                    log_file+dirname_length(log_file), log_pos));
+  return ret;
+}
+
+int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
+                                               String *packet)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_send_event, thd,
+                   (&param, packet->c_ptr(), packet->length()));
+  return ret;
+}
+
+int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
+
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_reset_master, thd, (&param));
+  return ret;
+}
+
+void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
+                                          Master_info *mi)
+{
+  param->mysql= mi->mysql;
+  param->user= mi->user;
+  param->host= mi->host;
+  param->port= mi->port;
+  param->master_log_name= mi->master_log_name;
+  param->master_log_pos= mi->master_log_pos;
+}
+
+int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, thread_start, thd, (&param));
+  return ret;
+}
+
+
+int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
+{
+
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, thread_stop, thd, (&param));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
+                                                      Master_info *mi,
+                                                      ushort flags)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, before_request_transmit, thd, (&param, (uint32)flags));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
+                                               const char *packet, ulong len,
+                                               const char **event_buf,
+                                               ulong *event_len)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_read_event, thd,
+                   (&param, packet, len, event_buf, event_len));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
+                                                const char *event_buf,
+                                                ulong event_len,
+                                                bool synced)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  uint32 flags=0;
+  if (synced)
+    flags |= BINLOG_STORAGE_IS_SYNCED;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_queue_event, thd,
+                   (&param, event_buf, event_len, flags));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
+
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_reset_slave, thd, (&param));
+  return ret;
+}
+#endif /* HAVE_REPLICATION */
+
+int register_trans_observer(Trans_observer *observer, void *p)
+{
+  return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_trans_observer(Trans_observer *observer, void *p)
+{
+  return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+  return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+  return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+#ifdef HAVE_REPLICATION
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+  return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+  return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+  return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+  return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+#endif /* HAVE_REPLICATION */

=== added file 'sql/rpl_handler.h'
--- a/sql/rpl_handler.h	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.h	2008-09-23 15:06:18 +0000
@@ -0,0 +1,213 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef RPL_HANDLER_H
+#define RPL_HANDLER_H
+
+#include "mysql_priv.h"
+#include "rpl_mi.h"
+#include "rpl_rli.h"
+#include "sql_plugin.h"
+#include "replication.h"
+
+class Observer_info {
+public:
+  void *observer;
+  st_plugin_int *plugin_int;
+  plugin_ref plugin;
+
+  Observer_info(void *ob, st_plugin_int *p)
+    :observer(ob), plugin_int(p)
+  {
+    plugin= plugin_int_to_ref(plugin_int);
+  }
+};
+
+class Delegate {
+public:
+  typedef List<Observer_info> Observer_info_list;
+  typedef List_iterator<Observer_info> Observer_info_iterator;
+  
+  int add_observer(void *observer, st_plugin_int *plugin)
+  {
+    int ret= FALSE;
+    if (!inited)
+      return TRUE;
+    write_lock();
+    Observer_info_iterator iter(observer_info_list);
+    Observer_info *info= iter++;
+    while (info && info->observer != observer)
+      info= iter++;
+    if (!info)
+    {
+      info= new Observer_info(observer, plugin);
+      if (!info || observer_info_list.push_back(info, &memroot))
+        ret= TRUE;
+    }
+    else
+      ret= TRUE;
+    unlock();
+    return ret;
+  }
+  
+  int remove_observer(void *observer, st_plugin_int *plugin)
+  {
+    int ret= FALSE;
+    if (!inited)
+      return TRUE;
+    write_lock();
+    Observer_info_iterator iter(observer_info_list);
+    Observer_info *info= iter++;
+    while (info && info->observer != observer)
+      info= iter++;
+    if (info)
+      iter.remove();
+    else
+      ret= TRUE;
+    unlock();
+    return ret;
+  }
+
+  inline Observer_info_iterator observer_info_iter()
+  {
+    return Observer_info_iterator(observer_info_list);
+  }
+
+  inline bool is_empty()
+  {
+    return observer_info_list.is_empty();
+  }
+
+  inline int read_lock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_rdlock(&lock);
+  }
+
+  inline int write_lock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_wrlock(&lock);
+  }
+
+  inline int unlock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_unlock(&lock);
+  }
+
+  inline bool is_inited()
+  {
+    return inited;
+  }
+  
+  Delegate()
+  {
+    inited= FALSE;
+    if (my_rwlock_init(&lock, NULL))
+      return;
+    init_sql_alloc(&memroot, 1024, 0);
+    inited= TRUE;
+  }
+  ~Delegate()
+  {
+    inited= FALSE;
+    rwlock_destroy(&lock);
+    free_root(&memroot, MYF(0));
+  }
+
+private:
+  Observer_info_list observer_info_list;
+  rw_lock_t lock;
+  MEM_ROOT memroot;
+  bool inited;
+};
+
+class Trans_delegate
+  :public Delegate {
+public:
+  typedef Trans_observer Observer;
+  int before_commit(THD *thd, bool all);
+  int before_rollback(THD *thd, bool all);
+  int after_commit(THD *thd, bool all);
+  int after_rollback(THD *thd, bool all);
+};
+
+class Binlog_storage_delegate
+  :public Delegate {
+public:
+  typedef Binlog_storage_observer Observer;
+  int after_flush(THD *thd, const char *log_file,
+                  my_off_t log_pos, bool synced);
+};
+
+#ifdef HAVE_REPLICATION
+class Binlog_transmit_delegate
+  :public Delegate {
+public:
+  typedef Binlog_transmit_observer Observer;
+  int transmit_start(THD *thd, ushort flags,
+                     const char *log_file, my_off_t log_pos);
+  int transmit_stop(THD *thd, ushort flags);
+  int reserve_header(THD *thd, ushort flags, String *packet);
+  int before_send_event(THD *thd, ushort flags,
+                        String *packet, const
+                        char *log_file, my_off_t log_pos );
+  int after_send_event(THD *thd, ushort flags,
+                       String *packet);
+  int after_reset_master(THD *thd, ushort flags);
+};
+
+class Binlog_relay_IO_delegate
+  :public Delegate {
+public:
+  typedef Binlog_relay_IO_observer Observer;
+  int thread_start(THD *thd, Master_info *mi);
+  int thread_stop(THD *thd, Master_info *mi);
+  int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
+  int after_read_event(THD *thd, Master_info *mi,
+                       const char *packet, ulong len,
+                       const char **event_buf, ulong *event_len);
+  int after_queue_event(THD *thd, Master_info *mi,
+                        const char *event_buf, ulong event_len,
+                        bool synced);
+  int after_reset_slave(THD *thd, Master_info *mi);
+private:
+  void init_param(Binlog_relay_IO_param *param, Master_info *mi);
+};
+#endif /* HAVE_REPLICATION */
+
+int delegates_init();
+void delegates_destroy();
+
+extern Trans_delegate *transaction_delegate;
+extern Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+extern Binlog_transmit_delegate *binlog_transmit_delegate;
+extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+  if there is no observers in the delegate, we can return 0
+  immediately.
+*/
+#define RUN_HOOK(group, hook, args)             \
+  (group ##_delegate->is_empty() ?              \
+   0 : group ##_delegate->hook args)
+
+#endif /* RPL_HANDLER_H */

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2008-08-08 01:39:23 +0000
+++ b/sql/slave.cc	2008-09-26 14:05:38 +0000
@@ -40,6 +40,7 @@
 #include <sql_common.h>
 #include <errmsg.h>
 #include <mysys_err.h>
+#include "rpl_handler.h"
 
 #ifdef HAVE_REPLICATION
 
@@ -68,6 +69,8 @@ ulonglong relay_log_space_limit = 0;
 int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
 int events_till_abort = -1;
 
+static pthread_key(Master_info*, RPL_MASTER_INFO);
+
 enum enum_slave_reconnect_actions
 {
   SLAVE_RECON_ACT_REG= 0,
@@ -226,6 +229,10 @@ int init_slave()
     TODO: re-write this to interate through the list of files
     for multi-master
   */
+
+  if (pthread_key_create(&RPL_MASTER_INFO, NULL))
+    goto err;
+
   active_mi= new Master_info;
 
   /*
@@ -1500,17 +1507,22 @@ static int safe_sleep(THD* thd, int sec,
 }
 
 
-static int request_dump(MYSQL* mysql, Master_info* mi,
-                        bool *suppress_warnings)
+static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
+			bool *suppress_warnings)
 {
   uchar buf[FN_REFLEN + 10];
   int len;
-  int binlog_flags = 0; // for now
+  ushort binlog_flags = 0; // for now
   char* logname = mi->master_log_name;
   DBUG_ENTER("request_dump");
   
   *suppress_warnings= FALSE;
 
+  if (RUN_HOOK(binlog_relay_io,
+               before_request_transmit,
+               (thd, mi, binlog_flags)))
+    DBUG_RETURN(1);
+  
   // TODO if big log files: Change next to int8store()
   int4store(buf, (ulong) mi->master_log_pos);
   int2store(buf + 4, binlog_flags);
@@ -2135,6 +2147,12 @@ pthread_handler_t handle_slave_io(void *
                             mi->master_log_name,
                             llstr(mi->master_log_pos,llbuff)));
 
+  /* This must be called before run any binlog_relay_io hooks */
+  my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
+
+  if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+    goto err;
+
   if (!(mi->mysql = mysql = mysql_init(NULL)))
   {
     mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
@@ -2209,7 +2227,7 @@ connected:
   while (!io_slave_killed(thd,mi))
   {
     thd_proc_info(thd, "Requesting binlog dump");
-    if (request_dump(mysql, mi, &suppress_warnings))
+    if (request_dump(thd, mysql, mi, &suppress_warnings))
     {
       sql_print_error("Failed on request_dump()");
       if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
@@ -2229,6 +2247,7 @@ requesting master dump") ||
           goto err;
         goto connected;
       });
+    const char *event_buf;
 
     while (!io_slave_killed(thd,mi))
     {
@@ -2284,10 +2303,27 @@ Stopping slave I/O thread due to out-of-
 
       retry_count=0;                    // ok event, reset retry counter
       thd_proc_info(thd, "Queueing master event to the relay log");
-      if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_len))
+      event_buf= (const char*)mysql->net.read_pos + 1;
+      if (RUN_HOOK(binlog_relay_io, after_read_event,
+                   (thd, mi,(const char*)mysql->net.read_pos + 1,
+                    event_len, &event_buf, &event_len)))
+      {
+        sql_print_error("Failed to run 'after_read_event' hook");
+        goto err;
+      }
+
+      /* XXX: 'synced' should be updated by queue_event to indicate
+         whether event has been synced to disk */
+      bool synced= 0;
+      if (queue_event(mi, event_buf, event_len))
       {
         goto err;
       }
+
+      if (RUN_HOOK(binlog_relay_io, after_queue_event,
+                   (thd, mi, event_buf, event_len, synced)))
+        goto err;
+
       if (flush_master_info(mi, 1))
       {
         sql_print_error("Failed to flush master info file");
@@ -2334,6 +2370,7 @@ err:
   sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
                   IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
   pthread_mutex_lock(&LOCK_thread_count);
+  RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
   thd->query = thd->db = 0; // extra safety
   thd->query_length= thd->db_length= 0;
   pthread_mutex_unlock(&LOCK_thread_count);
@@ -3454,6 +3491,64 @@ static int safe_reconnect(THD* thd, MYSQ
 }
 
 
+MYSQL *rpl_connect_master(MYSQL *mysql)
+{
+  THD *thd= current_thd;
+  Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
+  if (!mi)
+  {
+    sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
+    return NULL;
+  }
+
+  bool allocated= false;
+  
+  if (!mysql)
+  {
+    if(!(mysql= mysql_init(NULL)))
+      return NULL;
+    allocated= true;
+  }
+
+  /*
+    XXX: copied from connect_to_master, this function should not
+    change the slave status, so we cannot use connect_to_master
+    directly
+    
+    TODO: make this part a seperate function to eliminate duplication
+  */
+  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
+  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
+
+#ifdef HAVE_OPENSSL
+  if (mi->ssl)
+  {
+    mysql_ssl_set(mysql,
+                  mi->ssl_key[0]?mi->ssl_key:0,
+                  mi->ssl_cert[0]?mi->ssl_cert:0,
+                  mi->ssl_ca[0]?mi->ssl_ca:0,
+                  mi->ssl_capath[0]?mi->ssl_capath:0,
+                  mi->ssl_cipher[0]?mi->ssl_cipher:0);
+    mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
+                  &mi->ssl_verify_server_cert);
+  }
+#endif
+
+  mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+  /* This one is not strictly needed but we have it here for completeness */
+  mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
+
+  if (io_slave_killed(thd, mi)
+      || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
+                             mi->port, 0, 0))
+  {
+    if (allocated)
+      mysql_close(mysql);                       // this will free the object
+    return NULL;
+  }
+  return mysql;
+}
+
 /*
   Store the file and position where the execute-slave thread are in the
   relay log.

=== modified file 'sql/sql_insert.cc'
--- a/sql/sql_insert.cc	2008-10-01 12:02:28 +0000
+++ b/sql/sql_insert.cc	2008-10-07 17:04:28 +0000
@@ -3806,10 +3806,10 @@ void select_create::abort()
   DBUG_ENTER("select_create::abort");
 
   /*
-    In select_insert::abort() we roll back the statement, including
-    truncating the transaction cache of the binary log. To do this, we
-    pretend that the statement is transactional, even though it might
-    be the case that it was not.
+    We roll back the statement here, including truncating the
+    transaction cache of the binary log. To do this, we pretend that
+    the statement is transactional, even though it might be the case
+    that it was not.
 
     We roll back the statement prior to deleting the table and prior
     to releasing the lock on the table, since there might be potential
@@ -3823,6 +3823,7 @@ void select_create::abort()
   tmp_disable_binlog(thd);
   select_insert::abort();
   thd->transaction.stmt.modified_non_trans_table= FALSE;
+  trans_rollback_stmt(thd);
   reenable_binlog(thd);
   thd->binlog_flush_pending_rows_event(TRUE);
 

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2008-10-02 14:03:57 +0000
+++ b/sql/sql_parse.cc	2008-10-07 17:04:28 +0000
@@ -21,6 +21,7 @@
 #include <m_ctype.h>
 #include <myisam.h>
 #include <my_dir.h>
+#include "rpl_handler.h"
 
 #include "sp_head.h"
 #include "sp.h"

=== modified file 'sql/sql_plugin.cc'
--- a/sql/sql_plugin.cc	2008-08-19 15:58:48 +0000
+++ b/sql/sql_plugin.cc	2008-09-23 14:33:18 +0000
@@ -20,14 +20,6 @@
 #define REPORT_TO_LOG  1
 #define REPORT_TO_USER 2
 
-#ifdef DBUG_OFF
-#define plugin_ref_to_int(A) A
-#define plugin_int_to_ref(A) A
-#else
-#define plugin_ref_to_int(A) (A ? A[0] : NULL)
-#define plugin_int_to_ref(A) &(A)
-#endif
-
 extern struct st_mysql_plugin *mysqld_builtins[];
 
 char *opt_plugin_load= NULL;
@@ -44,7 +36,8 @@ const LEX_STRING plugin_type_names[MYSQL
   { C_STRING_WITH_LEN("FTPARSER") },
   { C_STRING_WITH_LEN("DAEMON") },
   { C_STRING_WITH_LEN("INFORMATION SCHEMA") },
-  { C_STRING_WITH_LEN("AUDIT") }
+  { C_STRING_WITH_LEN("AUDIT") },
+  { C_STRING_WITH_LEN("REPLICATION") },
 };
 
 extern int initialize_schema_table(st_plugin_int *plugin);
@@ -89,7 +82,8 @@ static int min_plugin_info_interface_ver
   MYSQL_FTPARSER_INTERFACE_VERSION,
   MYSQL_DAEMON_INTERFACE_VERSION,
   MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
-  MYSQL_AUDIT_INTERFACE_VERSION
+  MYSQL_AUDIT_INTERFACE_VERSION,
+  MYSQL_REPLICATION_INTERFACE_VERSION
 };
 static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
 {
@@ -98,7 +92,8 @@ static int cur_plugin_info_interface_ver
   MYSQL_FTPARSER_INTERFACE_VERSION,
   MYSQL_DAEMON_INTERFACE_VERSION,
   MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
-  MYSQL_AUDIT_INTERFACE_VERSION
+  MYSQL_AUDIT_INTERFACE_VERSION,
+  MYSQL_REPLICATION_INTERFACE_VERSION
 };
 
 static bool initialized= 0;

=== modified file 'sql/sql_plugin.h'
--- a/sql/sql_plugin.h	2008-06-27 20:56:54 +0000
+++ b/sql/sql_plugin.h	2008-09-23 14:33:18 +0000
@@ -18,6 +18,14 @@
 
 class sys_var;
 
+#ifdef DBUG_OFF
+#define plugin_ref_to_int(A) A
+#define plugin_int_to_ref(A) A
+#else
+#define plugin_ref_to_int(A) (A ? A[0] : NULL)
+#define plugin_int_to_ref(A) &(A)
+#endif
+
 /*
   the following flags are valid for plugin_init()
 */

=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc	2008-09-04 18:30:34 +0000
+++ b/sql/sql_repl.cc	2008-09-23 14:33:18 +0000
@@ -21,6 +21,7 @@
 #include "log_event.h"
 #include "rpl_filter.h"
 #include <my_dir.h>
+#include "rpl_handler.h"
 
 int max_binlog_dump_events = 0; // unlimited
 my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -378,11 +379,36 @@ static int send_heartbeat_event(NET* net
   {
     DBUG_RETURN(-1);
   }
-  packet->set("\0", 1, &my_charset_bin);
   DBUG_RETURN(0);
 }
 
 /*
+  Reset thread transmit packet buffer for event sending
+
+  This function allocates header bytes for event transmission, and
+  should be called before store the event data to the packet buffer.
+*/
+static int reset_transmit_packet(THD *thd, ushort flags,
+                                 ulong *ev_offset, const char **errmsg)
+{
+  int ret= 0;
+  String *packet= &thd->packet;
+
+  /* reserve and set default header */
+  packet->length(0);
+  packet->set("\0", 1, &my_charset_bin);
+
+  if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
+  {
+    *errmsg= "Failed to run hook 'reserve_header'";
+    my_errno= ER_UNKNOWN_ERROR;
+    ret= 1;
+  }
+  *ev_offset= packet->length();
+  return ret;
+}
+
+/*
   TODO: Clean up loop to only have one call to send_file()
 */
 
@@ -392,6 +418,9 @@ void mysql_binlog_send(THD* thd, char* l
   LOG_INFO linfo;
   char *log_file_name = linfo.log_file_name;
   char search_file_name[FN_REFLEN], *name;
+
+  ulong ev_offset;
+
   IO_CACHE log;
   File file = -1;
   String* packet = &thd->packet;
@@ -407,6 +436,9 @@ void mysql_binlog_send(THD* thd, char* l
   DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
 
   bzero((char*) &log,sizeof(log));
+  sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
+                        thd->server_id, log_ident, (ulong)pos);
+
   /* 
      heartbeat_period from @master_heartbeat_period user variable
   */
@@ -423,6 +455,13 @@ void mysql_binlog_send(THD* thd, char* l
     coord->file_name= log_file_name; // initialization basing on what slave remembers
     coord->pos= pos;
   }
+  if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+  {
+    errmsg= "Failed to run hook 'transmit_start'";
+    my_errno= ER_UNKNOWN_ERROR;
+    goto err;
+  }
+  
 #ifndef DBUG_OFF
   if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
   {
@@ -477,11 +516,9 @@ impossible position";
     goto err;
   }
 
-  /*
-    We need to start a packet with something other than 255
-    to distinguish it from error
-  */
-  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
+  /* reset transmit packet for the fake rotate event below */
+  if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+    goto err;
 
   /*
     Tell the client about the log name with a fake Rotate event;
@@ -521,7 +558,7 @@ impossible position";
     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
     goto err;
   }
-  packet->set("\0", 1, &my_charset_bin);
+
   /*
     Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
     this larger than the corresponding packet (query) sent 
@@ -537,6 +574,11 @@ impossible position";
   log_lock = mysql_bin_log.get_log_lock();
   if (pos > BIN_LOG_HEADER_SIZE)
   {
+    /* reset transmit packet for the event read from binary log
+       file */
+    if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+      goto err;
+
      /*
        Try to find a Format_description_log_event at the beginning of
        the binlog
@@ -544,29 +586,30 @@ impossible position";
      if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
      {
        /*
-         The packet has offsets equal to the normal offsets in a binlog
-         event +1 (the first character is \0).
+         The packet has offsets equal to the normal offsets in a
+         binlog event + ev_offset (the first ev_offset characters are
+         the header (default \0)).
        */
        DBUG_PRINT("info",
                   ("Looked for a Format_description_log_event, found event type %d",
-                   (*packet)[EVENT_TYPE_OFFSET+1]));
-       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+                   (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
+       if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
        {
-         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
                                        LOG_EVENT_BINLOG_IN_USE_F);
-         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+         (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
          /*
            mark that this event with "log_pos=0", so the slave
            should not increment master's binlog position
            (rli->group_master_log_pos)
          */
-         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
+         int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
          /*
            if reconnect master sends FD event with `created' as 0
            to avoid destroying temp tables.
           */
          int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
-                   ST_CREATED_OFFSET+1, (ulong) 0);
+                   ST_CREATED_OFFSET+ev_offset, (ulong) 0);
          /* send it */
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
          {
@@ -592,8 +635,6 @@ impossible position";
          Format_description_log_event will be found naturally if it is written.
        */
      }
-     /* reset the packet as we wrote to it in any case */
-     packet->set("\0", 1, &my_charset_bin);
   } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
   else
   {
@@ -605,6 +646,12 @@ impossible position";
 
   while (!net->error && net->vio != 0 && !thd->killed)
   {
+    Log_event_type event_type= UNKNOWN_EVENT;
+
+    /* reset the transmit packet for the event read from binary log
+       file */
+    if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+      goto err;
     while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
     {
 #ifndef DBUG_OFF
@@ -620,17 +667,26 @@ impossible position";
         log's filename does not change while it's active
       */
       if (coord)
-        coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
+        coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
 
-      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+      event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
+      if (event_type == FORMAT_DESCRIPTION_EVENT)
       {
-        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
                                       LOG_EVENT_BINLOG_IN_USE_F);
-        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+        (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
       }
-      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+      else if (event_type == STOP_EVENT)
         binlog_can_be_corrupted= FALSE;
 
+      pos = my_b_tell(&log);
+      if (RUN_HOOK(binlog_transmit, before_send_event,
+                   (thd, flags, packet, log_file_name, pos)))
+      {
+        my_errno= ER_UNKNOWN_ERROR;
+        errmsg= "run 'before_send_event' hook failed";
+        goto err;
+      }
       if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
       {
 	errmsg = "Failed on my_net_write()";
@@ -638,9 +694,8 @@ impossible position";
 	goto err;
       }
 
-      DBUG_PRINT("info", ("log event code %d",
-			  (*packet)[LOG_EVENT_OFFSET+1] ));
-      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+      DBUG_PRINT("info", ("log event code %d", event_type));
+      if (event_type == LOAD_EVENT)
       {
 	if (send_file(thd))
 	{
@@ -649,7 +704,17 @@ impossible position";
 	  goto err;
 	}
       }
-      packet->set("\0", 1, &my_charset_bin);
+
+      if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+      {
+        errmsg= "Failed to run hook 'after_send_event'";
+        my_errno= ER_UNKNOWN_ERROR;
+        goto err;
+      }
+
+      /* reset transmit packet for next loop */
+      if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+        goto err;
     }
 
     /*
@@ -700,6 +765,11 @@ impossible position";
 	}
 #endif
 
+        /* reset the transmit packet for the event read from binary log
+           file */
+        if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+          goto err;
+        
 	/*
 	  No one will update the log while we are reading
 	  now, but we'll be quick and just read one record
@@ -718,6 +788,7 @@ impossible position";
 	  read_packet = 1;
           if (coord)
             coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
+          event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
 	  break;
 
 	case LOG_READ_EOF:
@@ -753,6 +824,9 @@ impossible position";
                   sql_print_information("the rest of heartbeat info skipped ...");
               }
 #endif
+              /* reset transmit packet for the heartbeat event */
+              if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+                goto err;
               if (send_heartbeat_event(net, packet, coord))
               {
                 errmsg = "Failed on my_net_write()";
@@ -778,8 +852,17 @@ impossible position";
 	}
 
 	if (read_packet)
-	{
-	  thd_proc_info(thd, "Sending binlog event to slave");
+        {
+          thd_proc_info(thd, "Sending binlog event to slave");
+          pos = my_b_tell(&log);
+          if (RUN_HOOK(binlog_transmit, before_send_event,
+                       (thd, flags, packet, log_file_name, pos)))
+          {
+            my_errno= ER_UNKNOWN_ERROR;
+            errmsg= "run 'before_send_event' hook failed";
+            goto err;
+          }
+	  
 	  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
 	  {
 	    errmsg = "Failed on my_net_write()";
@@ -787,7 +870,7 @@ impossible position";
 	    goto err;
 	  }
 
-	  if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+	  if (event_type == LOAD_EVENT)
 	  {
 	    if (send_file(thd))
 	    {
@@ -796,11 +879,13 @@ impossible position";
 	      goto err;
 	    }
 	  }
-	  packet->set("\0", 1, &my_charset_bin);
-	  /*
-	    No need to net_flush because we will get to flush later when
-	    we hit EOF pretty quick
-	  */
+
+          if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+          {
+            my_errno= ER_UNKNOWN_ERROR;
+            errmsg= "Failed to run hook 'after_send_event'";
+            goto err;
+          }
 	}
 
 	if (fatal_error)
@@ -836,6 +921,10 @@ impossible position";
       end_io_cache(&log);
       (void) my_close(file, MYF(MY_WME));
 
+      /* reset transmit packet for the possible fake rotate event */
+      if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+        goto err;
+      
       /*
         Call fake_rotate_event() in case the previous log (the one which
         we have just finished reading) did not contain a Rotate event
@@ -853,8 +942,6 @@ impossible position";
 	goto err;
       }
 
-      packet->length(0);
-      packet->append('\0');
       if (coord)
         coord->file_name= log_file_name; // reset to the next
     }
@@ -864,6 +951,7 @@ end:
   end_io_cache(&log);
   (void)my_close(file, MYF(MY_WME));
 
+  RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
   my_eof(thd);
   thd_proc_info(thd, "Waiting to finalize termination");
   pthread_mutex_lock(&LOCK_thread_count);
@@ -874,6 +962,7 @@ end:
 err:
   thd_proc_info(thd, "Waiting to finalize termination");
   end_io_cache(&log);
+  RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
   /*
     Exclude  iteration through thread list
     this is needed for purge_logs() - it will iterate through
@@ -1134,6 +1223,7 @@ int reset_slave(THD *thd, Master_info* m
     goto err;
   }
 
+  RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
 err:
   unlock_slave_threads(mi);
   if (error)
@@ -1416,7 +1506,11 @@ int reset_master(THD* thd)
                ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
     return 1;
   }
-  return mysql_bin_log.reset_logs(thd);
+
+  if (mysql_bin_log.reset_logs(thd))
+    return 1;
+  RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
+  return 0;
 }
 
 int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
@@ -1926,5 +2020,3 @@ int init_replication_sys_vars()
 }
 
 #endif /* HAVE_REPLICATION */
-
-

=== modified file 'sql/transaction.cc'
--- a/sql/transaction.cc	2008-08-12 22:30:55 +0000
+++ b/sql/transaction.cc	2008-09-23 14:33:18 +0000
@@ -20,6 +20,7 @@
 
 #include "transaction.h"
 #include "mysql_priv.h"
+#include "rpl_handler.h"
 
 #ifdef WITH_MARIA_STORAGE_ENGINE
 #include "../storage/maria/ha_maria.h"
@@ -97,6 +98,14 @@ bool trans_commit(THD *thd)
 
   thd->server_status&= ~SERVER_STATUS_IN_TRANS;
   res= ha_commit_trans(thd, TRUE);
+  if (res)
+    /*
+      if res is non-zero, then ha_commit_trans has rolled back the
+      transaction, so the hooks for rollback will be called.
+    */
+    RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+  else
+    RUN_HOOK(transaction, after_commit, (thd, FALSE));
   thd->options&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
   thd->transaction.all.modified_non_trans_table= FALSE;
   thd->lex->start_transaction_opt= 0;
@@ -163,6 +172,7 @@ bool trans_rollback(THD *thd)
 
   thd->server_status&= ~SERVER_STATUS_IN_TRANS;
   res= ha_rollback_trans(thd, TRUE);
+  RUN_HOOK(transaction, after_rollback, (thd, FALSE));
   thd->options&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
   thd->transaction.all.modified_non_trans_table= FALSE;
   thd->lex->start_transaction_opt= 0;
@@ -192,6 +202,15 @@ bool trans_commit_stmt(THD *thd)
   int res= FALSE;
   if (thd->transaction.stmt.ha_list)
     res= ha_commit_trans(thd, FALSE);
+  
+  if (res)
+    /*
+      if res is non-zero, then ha_commit_trans has rolled back the
+      transaction, so the hooks for rollback will be called.
+    */
+    RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+  else
+    RUN_HOOK(transaction, after_commit, (thd, FALSE));
   DBUG_RETURN(test(res));
 }
 
@@ -216,6 +235,8 @@ bool trans_rollback_stmt(THD *thd)
       ha_rollback_trans(thd, TRUE);
   }
 
+  RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+
   DBUG_RETURN(FALSE);
 }
 

=== modified file 'storage/falcon/Cache.cpp'
--- a/storage/falcon/Cache.cpp	2008-09-27 18:04:19 +0000
+++ b/storage/falcon/Cache.cpp	2008-10-02 23:06:04 +0000
@@ -50,11 +50,9 @@
 extern uint falcon_io_threads;
 
 //#define STOP_PAGE		55
-//#define CACHE_TRACE_FILE	"cache.trace"
+#define TRACE_FILE	"cache.trace"
 
-#ifdef CACHE_TRACE_FILE
 static FILE			*traceFile;
-#endif // CACHE_TRACE_FILE
 
 static const uint64 cacheHunkSize		= 1024 * 1024 * 128;
 static const int	ASYNC_BUFFER_SIZE	= 1024000;
@@ -70,50 +68,21 @@ static const char THIS_FILE[]=__FILE__;
 
 Cache::Cache(Database *db, int pageSz, int hashSz, int numBuffers)
 {
-	openTraceFile();
+	//openTraceFile();
 	database = db;
 	panicShutdown = false;
 	pageSize = pageSz;
-
-	unsigned int highBit;
-	for (highBit=0x01; highBit < (unsigned int)hashSz; highBit= highBit << 1) { }
-
-	// if there are more than 4096 buckets then lets round down
-	// else lets round up
-	if (highBit >= 0x00001000)
-		{
-		// use power of two rounded down
-		hashSize = highBit << 1;
-		}
-	else
-		{
-		// use power of two rounded up
-		hashSize = highBit;
-		}
-
-	hashMask = hashSize - 1;
+	hashSize = hashSz;
 	numberBuffers = numBuffers;
 	upperFraction = numberBuffers / 4;
 	bufferAge = 0;
 	firstDirty = NULL;
 	lastDirty = NULL;
+	numberDirtyPages = 0;
 	pageWriter = NULL;
-	hashTable = new Bdb* [hashSize];
+	hashTable = new Bdb* [hashSz];
 	memset (hashTable, 0, sizeof (Bdb*) * hashSize);
-#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
-    syncHashTable = new SyncObject [DEBUG_SYNC_HASH_TABLE_SIZE];
-	for (int loop = 0; loop < DEBUG_SYNC_HASH_TABLE_SIZE; loop ++)
-		syncHashTable[loop].setName("Cache::syncHashTable");
-
-#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
-    syncHashTable = new SyncObject [hashSize];
-	for (int loop = 0; loop < hashSize; loop ++)
-		{
-		char tmpName[128];
-		snprintf(tmpName,120,"Cache::syncHashTable[%d]",loop);
-		syncHashTable[loop].setName(tmpName);
-		}
-#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
+	
 	if (falcon_use_sectorcache)
 		sectorCache = new SectorCache(sectorCacheSize / SECTOR_BUFFER_SIZE, pageSize);
 
@@ -133,10 +102,9 @@ Cache::Cache(Database *db, int pageSz, i
 	ioThreads = new Thread*[numberIoThreads];
 	memset(ioThreads, 0, numberIoThreads * sizeof(ioThreads[0]));
 	flushing = false;
-
+	
 	try
 		{
-		// non-protected access to bdbs,endBdbs is OK during initialization
 		bdbs = new Bdb [numberBuffers];
 		endBdbs = bdbs + numberBuffers;
 		int remaining = 0;
@@ -155,7 +123,6 @@ Cache::Cache(Database *db, int pageSz, i
 				}
 
 			bdb->cache = this;
-			// non-protected access to bufferQueue is OK during initialization
 			bufferQueue.append(bdb);
 			bdb->buffer = (Page*) stuff;
 			stuff += pageSize;
@@ -181,17 +148,17 @@ Cache::Cache(Database *db, int pageSz, i
 
 Cache::~Cache()
 {
-
-	closeTraceFile();
+	if (traceFile)
+		closeTraceFile();
 
 	delete [] hashTable;
-	delete [] syncHashTable;
 	delete [] bdbs;
 	delete [] ioThreads;
 	delete flushBitmap;
+
 	if (falcon_use_sectorcache)
 		delete sectorCache;
-
+	
 	if (bufferHunks)
 		{
 		for (int n = 0; n < numberHunks; ++n)
@@ -204,16 +171,19 @@ Cache::~Cache()
 Bdb* Cache::probePage(Dbb *dbb, int32 pageNumber)
 {
 	ASSERT (pageNumber >= 0);
-	Bdb *bdb;
-
-	/* If we already have a buffer for this, we're done */
-	bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
+	Sync sync (&syncObject, "Cache::probePage");
+	sync.lock (Shared);
+	Bdb *bdb = findBdb(dbb, pageNumber);
+	
 	if (bdb)
 		{
+		bdb->incrementUseCount(ADD_HISTORY);
+		sync.unlock();
+
 		if (bdb->buffer->pageType == PAGE_free)
 			{
 			bdb->decrementUseCount(REL_HISTORY);
-
+			
 			return NULL;
 			}
 
@@ -226,49 +196,11 @@ Bdb* Cache::probePage(Dbb *dbb, int32 pa
 	return NULL;
 }
 
-Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber, int slot)
-{
-	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
-		{
-		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
-			{
-			return bdb;
-			}
-		}
-
-	return NULL;
-}
-
 Bdb* Cache::findBdb(Dbb* dbb, int32 pageNumber)
 {
-	return (findBdb(dbb, pageNumber, PAGENUM_2_SLOT(pageNumber)));
-}
-
-Bdb* Cache::lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber)
-{
-	int slot = PAGENUM_2_SLOT(pageNumber);
-	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
-	lockHash.lock (Shared);
-	Bdb *bdb;
-
-	bdb = findBdb(dbb, pageNumber, slot);
-	if (bdb != NULL)
-		bdb->incrementUseCount(ADD_HISTORY);
-
-	return bdb;
-}
-
-Bdb* Cache::lockFindBdbIncrementUseCount(int32 pageNumber, int slot)
-{
-	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::lockFindBdbIncrementUseCount");
-	lockHash.lock (Shared);
-
-	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
-		if (bdb->pageNumber == pageNumber)
-		{
-			bdb->incrementUseCount(ADD_HISTORY);
+	for (Bdb *bdb = hashTable [pageNumber % hashSize]; bdb; bdb = bdb->hash)
+		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
 			return bdb;
-		}
 
 	return NULL;
 }
@@ -278,106 +210,95 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
 	if (panicShutdown)
 		{
 		Thread *thread = Thread::getThread("Cache::fetchPage");
-
+		
 		if (thread->pageMarks == 0)
 			throw SQLError(RUNTIME_ERROR, "Emergency shut is underway");
 		}
 
-#ifdef STOP_PAGE
+#ifdef STOP_PAGE			
 		if (pageNumber == STOP_PAGE)
  			Log::debug("fetching page %d/%d\n", pageNumber, dbb->tableSpaceId);
 #endif
 
 	ASSERT (pageNumber >= 0);
+	int slot = pageNumber % hashSize;
+	LockType actual = lockType;
+	Sync sync (&syncObject, "Cache::fetchPage");
+	sync.lock (Shared);
+	int hit = 0;
+
+	/* If we already have a buffer for this go, we're done */
+
 	Bdb *bdb;
 
-	/* If we already have a buffer for this, we're done */
-	bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
+	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+			{
+			//syncObject.validateShared("Cache::fetchPage");
+			bdb->incrementUseCount(ADD_HISTORY);
+			sync.unlock();
+			bdb->addRef(lockType  COMMA_ADD_HISTORY);
+			bdb->decrementUseCount(REL_HISTORY);
+			hit = 1;
+			break;
+			}
+
 	if (!bdb)
 		{
-		// getFreeBuffer() locks a hash bucket to remove the candidate bdb
-		// if we locked our hash bucket before the call then we could have
-		// a deadlock
-		// thus we get the free buffer before we lock the hash bucket we will
-		// be inserting into.  This avoids a dead lock but generates a race
-		// we take care of the race by reversing the getFreeBuffer() work
-		// when we lose the race
-		Bdb *bdbAvailable;
-		int slot = PAGENUM_2_SLOT(pageNumber);
-		Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fetchPage");
-
-		bdbAvailable = getFreeBuffer();
-		/* assume we'll be inserting this new BDB.  Set new page number. */
-		bdbAvailable->pageNumber = pageNumber;
-		bdbAvailable->dbb = dbb;
+		sync.unlock();
+		actual = Exclusive;
+		sync.lock(Exclusive);
+
+		for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+			if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+				{
+				//syncObject.validateExclusive("Cache::fetchPage (retry)");
+				bdb->incrementUseCount(ADD_HISTORY);
+				sync.unlock();
+				bdb->addRef(lockType  COMMA_ADD_HISTORY);
+				bdb->decrementUseCount(REL_HISTORY);
+				hit = 2;
+				break;
+				}
 
-		lockHash.lock(Exclusive);
-		bdb = findBdb(dbb, pageNumber, slot);
 		if (!bdb)
 			{
-			// we won the race so lets use the free bdb
-			// relink into hash table
-			bdbAvailable->hash = hashTable [slot];
-			hashTable [slot] = bdbAvailable;
-			lockHash.unlock();
+			bdb = findBuffer(dbb, pageNumber, actual);
+			moveToHead(bdb);
+			sync.unlock();
 
-			bdb = bdbAvailable;
-#ifdef STOP_PAGE
+#ifdef STOP_PAGE			
 			if (bdb->pageNumber == STOP_PAGE)
 				Log::debug("reading page %d/%d\n", bdb->pageNumber, dbb->tableSpaceId);
 #endif
-
+			
 			Priority priority(database->ioScheduler);
-			priority.schedule(PRIORITY_MEDIUM);
+			priority.schedule(PRIORITY_MEDIUM);	
 			if (falcon_use_sectorcache)
 				sectorCache->readPage(bdb);
 			else
 				dbb->readPage(bdb);
-
 			priority.finished();
 #ifdef HAVE_PAGE_NUMBER
 			ASSERT(bdb->buffer->pageNumber == pageNumber);
-#endif
-			if (Exclusive != lockType)
+#endif			
+			if (actual != lockType)
 				bdb->downGrade(lockType);
-
-			}
-		else
-			{
-			//syncObject.validateExclusive("Cache::fetchPage (retry)");
-			bdb->incrementUseCount(ADD_HISTORY);
-			lockHash.unlock();
-			bdb->addRef(lockType  COMMA_ADD_HISTORY);
-			bdb->decrementUseCount(REL_HISTORY);
-			moveToHead(bdb);
-
-			// lost a race.  put our available back to useable
-			// side effect, bdbAvailable will have to age again before we re-use it.
-			bdbAvailable->hash = NULL;
-			bdbAvailable->pageNumber = -1;
-			bdbAvailable->dbb = NULL;
-			bdbAvailable->release(REL_HISTORY);
 			}
 		}
-	else
-		{
-		bdb->addRef(lockType  COMMA_ADD_HISTORY);
-		bdb->decrementUseCount(REL_HISTORY);
-		moveToHead(bdb);
-		}
 
 	Page *page = bdb->buffer;
-
+	
 	/***
 	if (page->checksum != (short) pageNumber)
 		FATAL ("page %d wrong page number, got %d\n",
 				 bdb->pageNumber, page->checksum);
 	***/
-
+	
 	if (pageType && page->pageType != pageType)
 		{
 		/*** future code
-		bdb->release(REL_HISTORY);
+		bdb->release();
 		throw SQLError (DATABASE_CORRUPTION, "page %d wrong page type, expected %d got %d\n",
 						pageNumber, pageType, page->pageType);
 		***/
@@ -385,6 +306,14 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
 				 bdb->pageNumber, dbb->tableSpaceId, pageType, page->pageType);
 		}
 
+	// If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
+	
+	if (bdb->age < bufferAge - (uint64) upperFraction)
+		{
+		sync.lock (Exclusive);
+		moveToHead (bdb);
+		}
+		
 	ASSERT (bdb->pageNumber == pageNumber);
 	ASSERT (bdb->dbb == dbb);
 	ASSERT (bdb->useCount > 0);
@@ -394,74 +323,43 @@ Bdb* Cache::fetchPage(Dbb *dbb, int32 pa
 
 Bdb* Cache::fakePage(Dbb *dbb, int32 pageNumber, PageType type, TransId transId)
 {
-	Bdb *bdb;
+	Sync sync(&syncObject, "Cache::fakePage");
+	sync.lock(Exclusive);
+	int	slot = pageNumber % hashSize;
 
-#ifdef STOP_PAGE
+#ifdef STOP_PAGE			
 	if (pageNumber == STOP_PAGE)
 		Log::debug("faking page %d/%d\n",pageNumber, dbb->tableSpaceId);
 #endif
 
 	/* If we already have a buffer for this, we're done */
-	bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
-	if (!bdb)
-		{
-		// getFreeBuffer() locks a hash bucket to remove the candidate bdb
-		// if we locked our hash bucket before the call then we could have
-		// a deadlock
-		// thus we get the free buffer before we lock the hash bucket we will
-		// be inserting into.  This avoids a dead lock but generates a race
-		// we take care of the race by reversing the getFreeBuffer() work
-		// when we lose the race
-		Bdb *bdbAvailable;
-		int slot = PAGENUM_2_SLOT(pageNumber);
-		Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::fetchPage");
-
-		bdbAvailable = getFreeBuffer();
-		/* assume we'll be inserting this new BDB.  Set new page number. */
-		bdbAvailable->pageNumber = pageNumber;
-		bdbAvailable->dbb = dbb;
 
-		lockHash.lock(Exclusive);
-		bdb = findBdb(dbb, pageNumber, slot);
-		if (!bdb)
-			{
-			// we won the race so lets use the free bdb
-			// relink into hash table
-			bdbAvailable->hash = hashTable [slot];
-			hashTable [slot] = bdbAvailable;
-			lockHash.unlock();
+	Bdb *bdb;
 
-			bdb = bdbAvailable;
-			}
-		else
+	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
 			{
-			//syncObject.validateExclusive("Cache::fetchPage (retry)");
-			bdb->incrementUseCount(ADD_HISTORY);
-			lockHash.unlock();
+			if (bdb->syncObject.isLocked())
+				{
+				// The pageWriter may still be cleaning up this freed page with a shared lock
+				ASSERT(bdb->buffer->pageType == PAGE_free);
+				ASSERT(bdb->syncObject.getState() >= 0);
+				}
+				
 			bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
-			bdb->decrementUseCount(REL_HISTORY);
-			moveToHead(bdb);
-
-			// lost a race.  put our available back to useable
-			// side effect, bdbAvailable will have to age again before we re-use it.
-			bdbAvailable->hash = NULL;
-			bdbAvailable->pageNumber = -1;
-			bdbAvailable->dbb = NULL;
-			bdbAvailable->release(REL_HISTORY);
+			
+			break;
 			}
-		}
-	else
-		{
-		bdb->addRef(Exclusive  COMMA_ADD_HISTORY);
-		bdb->decrementUseCount(REL_HISTORY);
-		moveToHead(bdb);
-		}
+
+	if (!bdb)
+		bdb = findBuffer(dbb, pageNumber, Exclusive);
 
 	if (!dbb->isReadOnly)
 		bdb->mark(transId);
-
+		
 	memset(bdb->buffer, 0, pageSize);
 	bdb->setPageHeader(type);
+	moveToHead(bdb);
 
 	return bdb;
 }
@@ -469,19 +367,19 @@ Bdb* Cache::fakePage(Dbb *dbb, int32 pag
 void Cache::flush(int64 arg)
 {
 	Sync flushLock(&syncFlush, "Cache::flush(1)");
-	Sync dirtyLock(&syncDirty, "Cache::flush(2)");
+	Sync sync(&syncDirty, "Cache::flush(2)");
 	flushLock.lock(Exclusive);
-
+	
 	if (flushing)
 		return;
 
 	syncWait.lock(NULL, Exclusive);
+	sync.lock(Shared);
 	//Log::debug(%d: "Initiating flush\n", dbb->deltaTime);
 	flushArg = arg;
 	flushPages = 0;
 	physicalWrites = 0;
-
-	dirtyLock.lock(Shared);
+	
 	for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
 		{
 		bdb->flushIt = true;
@@ -489,14 +387,14 @@ void Cache::flush(int64 arg)
 		++flushPages;
 		}
 
-	dirtyLock.unlock();
-
-	analyzeFlush();
+	if (traceFile)
+		analyzeFlush();
 
 	flushStart = database->timestamp;
 	flushing = true;
+	sync.unlock();
 	flushLock.unlock();
-
+	
 	for (int n = 0; n < numberIoThreads; ++n)
 		if (ioThreads[n])
 			ioThreads[n]->wake();
@@ -504,135 +402,69 @@ void Cache::flush(int64 arg)
 
 void Cache::moveToHead(Bdb * bdb)
 {
-	// If buffer has moved out of the upper "fraction" of the LRU queue, move it back up
-	// non-protected access to age is harmless since it is fuzzy anyway
-	if (bdb->age < bufferAge - (uint64) upperFraction)
-		{
-		Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::moveToHead");
-
-		bufferQueueLock.lock (Exclusive);
-		bdb->age = bufferAge++;
-		bufferQueue.remove(bdb);
-		bufferQueue.prepend(bdb);
-		//validateUnique (bdb);
-		}
-}
-
-void Cache::moveToHeadAlreadyLocked(Bdb * bdb)
-{
 	bdb->age = bufferAge++;
 	bufferQueue.remove(bdb);
 	bufferQueue.prepend(bdb);
 	//validateUnique (bdb);
 }
 
-Bdb* Cache::getFreeBuffer(void)
+Bdb* Cache::findBuffer(Dbb *dbb, int pageNumber, LockType lockType)
 {
-	Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::getFreeBuffer");
-	unsigned int count;
+	//syncObject.validateExclusive("Cache::findBuffer");
+	int	slot = pageNumber % hashSize;
+	Sync sync(&syncDirty, "Cache::findBuffer");
+	
+	/* Find least recently used, not-in-use buffer */
+
 	Bdb *bdb;
 
 	// Find a candidate BDB.
+	
 	for (;;)
 		{
-		bufferQueueLock.lock (Exclusive);
-		// find a candidate that is NOT in use and NOT dirty and in the tail fraction of the LRU
-		for (count = 0, bdb = bufferQueue.last; bdb ; bdb = bdb->prior, count++)
-			{
-			if (count >= upperFraction)
-				{
-				bdb = NULL;
-				break;
-				}
-
+		for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
 			if (bdb->useCount == 0)
-				{
-				if (!bdb->isDirty)
-					{
-					bdb->incrementUseCount(REL_HISTORY);
-					moveToHeadAlreadyLocked(bdb);
-					break;
-					}
-				}
-			else
-				{
-				// get this one out of the way so we don't search it every time
-				moveToHeadAlreadyLocked(bdb);
-#ifdef CHECK_STALLED_BDB
-				bdb->stallCount++;
-				if ((bdb->stallCount & 0x03) == 0x03)
-					{
-					Log::debug("Page %d is in use and aged %d times\n",
-							   bdb->pageNumber, bdb->stallCount);
-					}
-#endif // CHECK_STALLED_BDB
-				}
-			}
-		if (!bdb)
-			// find a candidate that is NOT in use, could be dirty
-			for (bdb = bufferQueue.last; bdb; bdb = bdb->prior)
-				if (bdb->useCount == 0)
-					{
-					bdb->incrementUseCount(REL_HISTORY);
-					moveToHeadAlreadyLocked(bdb);
-					break;
-					}
-
-		bufferQueueLock.unlock();
+				break;
 
 		if (!bdb)
 			throw SQLError(RUNTIME_ERROR, "buffer pool is exhausted\n");
+			
+		if (!bdb->isDirty)
+			break;
+			
+		writePage (bdb, WRITE_TYPE_REUSE);
+		}
 
-		if (bdb->pageNumber >= 0)
-			{
-			int	slotRemove = PAGENUM_2_SLOT(bdb->pageNumber);
-			Sync lockHashRemove (&syncHashTable[PAGENUM_2_LOCK_INDEX(bdb->pageNumber, slotRemove)], "Cache::getFreeBuffer");
-			lockHashRemove.lock(Exclusive);
+	/* Unlink its old incarnation from the page/hash table */
 
-			if (bdb->useCount != 1)
+	if (bdb->pageNumber >= 0)
+		for (Bdb **ptr = hashTable + bdb->pageNumber % hashSize;; ptr = &(*ptr)->hash)
+			if (*ptr == bdb)
 				{
-				// we lost a race try again
-				bdb->decrementUseCount(REL_HISTORY);
-				lockHashRemove.unlock();
-				continue;
+				*ptr = bdb->hash;
+				break;
 				}
+			else
+				ASSERT (*ptr);
 
-			if (bdb->isDirty)
-				writePage (bdb, WRITE_TYPE_REUSE);
+	bdb->addRef (lockType  COMMA_ADD_HISTORY);
 
-			/* Unlink its old incarnation from the page/hash table */
-			for (Bdb **ptr = hashTable + PAGENUM_2_SLOT(bdb->pageNumber) ;; ptr = &(*ptr)->hash)
-				if (*ptr == bdb)
-					{
-					*ptr = bdb->hash;
-					break;
-					}
-				else
-					ASSERT (*ptr);
+	/* Set new page number and relink into hash table */
 
-			}
-
-		break;
-		}
-#ifdef CHECK_STALLED_BDB
-	bdb->stallCount = 0;
-#endif // CHECK_STALLED_BDB
+	bdb->hash = hashTable [slot];
+	hashTable [slot] = bdb;
+	bdb->pageNumber = pageNumber;
+	bdb->dbb = dbb;
 
 #ifdef COLLECT_BDB_HISTORY
 	bdb->initHistory();
 #endif
-	bdb->addRef (Exclusive  COMMA_ADD_HISTORY);
-	bdb->decrementUseCount(REL_HISTORY);
 
 	return bdb;
 }
 
 void Cache::validate()
 {
-	//Sync bufferQueueLock (&bufferQueue.syncObject, "Cache::validate");
-
-	//bufferQueueLock.lock (Shared);
-	// non-protected access to bufferQueue is DANGEROUS...
 	for (Bdb *bdb = bufferQueue.last; bdb; bdb = bdb->prior)
 		{
 		//IndexPage *page = (IndexPage*) bdb->buffer;
@@ -642,8 +474,8 @@ void Cache::validate()
 
 void Cache::markDirty(Bdb *bdb)
 {
-	Sync dirtyLock (&syncDirty, "Cache::markDirty");
-	dirtyLock.lock (Exclusive);
+	Sync sync (&syncDirty, "Cache::markDirty");
+	sync.lock (Exclusive);
 	bdb->nextDirty = NULL;
 	bdb->priorDirty = lastDirty;
 
@@ -653,21 +485,23 @@ void Cache::markDirty(Bdb *bdb)
 		firstDirty = bdb;
 
 	lastDirty = bdb;
+	++numberDirtyPages;
 	//validateUnique (bdb);
 }
 
 void Cache::markClean(Bdb *bdb)
 {
-	Sync dirtyLock (&syncDirty, "Cache::markClean");
-	dirtyLock.lock (Exclusive);
+	Sync sync (&syncDirty, "Cache::markClean");
+	sync.lock (Exclusive);
 
 	/***
 	if (bdb->flushIt)
 		Log::debug(" Cleaning page %d in %s marked for flush\n", bdb->pageNumber, (const char*) bdb->dbb->fileName);
 	***/
-
+	
 	bdb->flushIt = false;
-
+	--numberDirtyPages;
+	
 	if (bdb == lastDirty)
 		lastDirty = bdb->priorDirty;
 
@@ -709,7 +543,6 @@ void Cache::writePage(Bdb *bdb, int type
 		{
 		if (falcon_use_sectorcache)
 			sectorCache->writePage(bdb);
-
 		dbb->writePage(bdb, type);
 		}
 	catch (SQLException& exception)
@@ -762,7 +595,7 @@ void Cache::writePage(Bdb *bdb, int type
 #endif
 
 	bdb->isDirty = false;
-
+	
 	if (pageWriter && bdb->isRegistered)
 		{
 		bdb->isRegistered = false;
@@ -771,8 +604,8 @@ void Cache::writePage(Bdb *bdb, int type
 
 	if (dbb->shadows)
 		{
-		Sync cloneLock (&dbb->syncClone, "Cache::writePage(2)");
-		cloneLock.lock (Shared);
+		Sync sync (&dbb->syncClone, "Cache::writePage(2)");
+		sync.lock (Shared);
 
 		for (DatabaseCopy *shadow = dbb->shadows; shadow; shadow = shadow->next)
 			shadow->rewritePage(bdb);
@@ -781,49 +614,45 @@ void Cache::writePage(Bdb *bdb, int type
 
 void Cache::analyze(Stream *stream)
 {
-	Sync dirtyLock (&syncDirty, "Cache::analyze");
+	Sync sync (&syncDirty, "Cache::analyze");
+	sync.lock (Shared);
 	int inUse = 0;
 	int dirty = 0;
 	int dirtyList = 0;
 	int total = 0;
 	Bdb *bdb;
 
-	// non-protected access to bdbs,endBdbs is DANGEROUS...
 	for (bdb = bdbs; bdb < endBdbs; ++bdb)
 		{
 		++total;
-
+		
 		if (bdb->isDirty)
 			++dirty;
-
+			
 		if (bdb->useCount)
 			++inUse;
 		}
 
-	dirtyLock.lock (Shared);
 	for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
 		++dirtyList;
 
-	dirtyLock.unlock();
-
 	stream->format ("Cache: %d pages, %d in use, %d dirty, %d in dirty chain\n",
 					total, inUse, dirty, dirtyList);
 }
 
 void Cache::validateUnique(Bdb *target)
 {
-	int	slot = PAGENUM_2_SLOT(target->pageNumber);
+	int	slot = target->pageNumber % hashSize;
 
-	// WARNING: unlocked walk of hash table.... DANGEROUS
 	for (Bdb *bdb = hashTable [slot]; bdb; bdb = bdb->hash)
 		ASSERT (bdb == target || !(bdb->pageNumber == target->pageNumber && bdb->dbb == target->dbb));
 }
 
 void Cache::freePage(Dbb *dbb, int32 pageNumber)
 {
-	int slot = PAGENUM_2_SLOT(pageNumber);
-	Sync lockHash (&syncHashTable[PAGENUM_2_LOCK_INDEX(pageNumber, slot)], "Cache::freePage");
-	lockHash.lock(Shared);
+	Sync sync (&syncObject, "Cache::freePage");
+	sync.lock (Shared);
+	int	slot = pageNumber % hashSize;
 
 	// If page exists in cache (usual case), clean it up
 
@@ -832,9 +661,10 @@ void Cache::freePage(Dbb *dbb, int32 pag
 			{
 			if (bdb->isDirty)
 				{
+				sync.unlock();
 				markClean (bdb);
 				}
-
+				
 			bdb->isDirty = false;
 			break;
 			}
@@ -859,14 +689,12 @@ void Cache::flush(Dbb *dbb)
 
 bool Cache::hasDirtyPages(Dbb *dbb)
 {
-	Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
-	dirtyLock.lock (Shared);
+	Sync sync (&syncDirty, "Cache::hasDirtyPages");
+	sync.lock (Shared);
 
 	for (Bdb *bdb = firstDirty; bdb; bdb = bdb->nextDirty)
 		if (bdb->dbb == dbb)
-			{
 			return true;
-			}
 
 	return false;
 }
@@ -887,22 +715,32 @@ Bdb* Cache::trialFetch(Dbb* dbb, int32 p
 	if (panicShutdown)
 		{
 		Thread *thread = Thread::getThread("Cache::trialFetch");
-
+		
 		if (thread->pageMarks == 0)
 			throw SQLError(RUNTIME_ERROR, "Emergency shut is underway");
 		}
 
 	ASSERT (pageNumber >= 0);
+	int	slot = pageNumber % hashSize;
+	Sync sync (&syncObject, "Cache::trialFetch");
+	sync.lock (Shared);
+	int hit = 0;
+
+	/* If we already have a buffer for this go, we're done */
+
 	Bdb *bdb;
 
-	/* If we already have a buffer for this, we're done */
-	bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
-	if (bdb)
-		{
-		bdb->addRef(lockType  COMMA_ADD_HISTORY);
-		bdb->decrementUseCount(REL_HISTORY);
-		moveToHead(bdb);
-		}
+	for (bdb = hashTable [slot]; bdb; bdb = bdb->hash)
+		if (bdb->pageNumber == pageNumber && bdb->dbb == dbb)
+			{
+			//syncObject.validateShared("Cache::trialFetch");
+			bdb->incrementUseCount(ADD_HISTORY);
+			sync.unlock();
+			bdb->addRef(lockType  COMMA_ADD_HISTORY);
+			bdb->decrementUseCount(REL_HISTORY);
+			hit = 1;
+			break;
+			}
 
 	return bdb;
 }
@@ -913,14 +751,13 @@ void Cache::syncFile(Dbb *dbb, const cha
 	int writes = dbb->writesSinceSync;
 	time_t start = database->timestamp;
 	dbb->sync();
-
+	
 	if (Log::isActive(LogInfo))
 		{
 		time_t delta = database->timestamp - start;
-
+		
 		if (delta > 1)
-			Log::log(LogInfo, "%d: %s %s sync: %d pages in %d seconds\n",
-					 database->deltaTime, fileName, text, writes, delta);
+			Log::log(LogInfo, "%d: %s %s sync: %d pages in %d seconds\n", database->deltaTime, fileName, text, writes, delta);
 		}
 }
 
@@ -931,186 +768,186 @@ void Cache::ioThread(void* arg)
 
 void Cache::ioThread(void)
 {
-	Sync syncThread(&syncThreads, "Cache::ioThread");
+	Sync syncThread(&syncThreads, "Cache::ioThread(1)");
 	syncThread.lock(Shared);
-	Sync flushLock(&syncFlush, "Cache::ioThread");
+	Sync flushLock(&syncFlush, "Cache::ioThread(2)");
+	Sync sync(&syncObject, "Cache::ioThread(3)");
 	Priority priority(database->ioScheduler);
 	Thread *thread = Thread::getThread("Cache::ioThread");
 	UCHAR *rawBuffer = new UCHAR[ASYNC_BUFFER_SIZE];
 	UCHAR *buffer = (UCHAR*) (((UIPTR) rawBuffer + pageSize - 1) / pageSize * pageSize);
 	UCHAR *end = (UCHAR*) ((UIPTR) (rawBuffer + ASYNC_BUFFER_SIZE) / pageSize * pageSize);
 	flushLock.lock(Exclusive);
-
+	
 	// This is the main loop.  Write blocks until there's nothing to do, then sleep
-
+	
 	for (;;)
 		{
 		int32 pageNumber = flushBitmap->nextSet(0);
 		int count;
-
+		Dbb *dbb;
+		
 		if (pageNumber >= 0)
 			{
-			Bdb *bdb;
-			Dbb *dbb;
-			int	slot = PAGENUM_2_SLOT(pageNumber);
+			int	slot = pageNumber % hashSize;
 			bool hit = false;
 			Bdb *bdbList = NULL;
 			UCHAR *p = buffer;
-
-			// Look for the page to flush.
-			bdb = lockFindBdbIncrementUseCount(pageNumber, slot);
-			if (bdb && bdb->flushIt && bdb->isDirty)
-				{
-				hit = true;
-				count = 0;
-				dbb = bdb->dbb;
-
-				flushBitmap->clear(pageNumber);
-
-				// get all his friends
-				while (p < end)
+			sync.lock(Shared);
+			
+			// Look for a page to flush.  Then get all his friends
+			
+			for (Bdb *bdb = hashTable[slot]; bdb; bdb = bdb->hash)
+				if (bdb->pageNumber == pageNumber && bdb->flushIt && bdb->isDirty)
 					{
-					++count;
-					bdb->addRef(Shared  COMMA_ADD_HISTORY);
-
-					bdb->syncWrite.lock(NULL, Exclusive);
-					bdb->ioThreadNext = bdbList;
-					bdbList = bdb;
-
-					//ASSERT(!(bdb->flags & BDB_write_pending));
-					//bdb->flags |= BDB_write_pending;
-					memcpy(p, bdb->buffer, pageSize);
-					p += pageSize;
-					bdb->flushIt = false;
-					markClean(bdb);
-					bdb->isDirty = false;
-					bdb->release(REL_HISTORY);
-
-					bdb = lockFindBdbIncrementUseCount(dbb, bdb->pageNumber + 1);
-					if (!bdb)
-						break;
-
-					if (!bdb->isDirty && !continueWrite(bdb))
+					hit = true;
+					count = 0;
+					dbb = bdb->dbb;
+					
+					if (!bdb->hash)
+						flushBitmap->clear(pageNumber);
+					
+					while (p < end)
 						{
-						bdb->decrementUseCount(REL_HISTORY);
-						break;
+						++count;
+						bdb->incrementUseCount(ADD_HISTORY);
+						sync.unlock();
+						bdb->addRef(Shared  COMMA_ADD_HISTORY);
+						if (falcon_use_sectorcache)
+							sectorCache->writePage(bdb);
+						
+						bdb->syncWrite.lock(NULL, Exclusive);
+						bdb->ioThreadNext = bdbList;
+						bdbList = bdb;
+						
+						//ASSERT(!(bdb->flags & BDB_write_pending));
+						//bdb->flags |= BDB_write_pending;
+						memcpy(p, bdb->buffer, pageSize);
+						p += pageSize;
+						bdb->flushIt = false;
+						markClean(bdb);
+						bdb->isDirty = false;
+						bdb->release(REL_HISTORY);
+						sync.lock(Shared);
+						
+						if ( !(bdb = findBdb(dbb, bdb->pageNumber + 1)) )
+							break;
+						
+						if (!bdb->isDirty && !continueWrite(bdb))
+							break;
 						}
-					}
-
-				flushLock.unlock();
-				//Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
-				int length = p - buffer;
-				priority.schedule(PRIORITY_LOW);
-
-				try
-					{
+					
+					if (sync.state != None)
+						sync.unlock();
+						
+					flushLock.unlock();
+					//Log::debug(" %d Writing %s %d pages: %d - %d\n", thread->threadId, (const char*) dbb->fileName, count, pageNumber, pageNumber + count - 1);
+					int length = p - buffer;
 					priority.schedule(PRIORITY_LOW);
-					dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
-					}
-				catch (SQLException& exception)
-					{
-					priority.finished();
-
-					if (exception.getSqlcode() != DEVICE_FULL)
-						throw;
-
-					database->setIOError(&exception);
-
-					for (bool error = true; error;)
+					
+					try
+						{
+						priority.schedule(PRIORITY_LOW);
+						dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+						}
+					catch (SQLException& exception)
 						{
-						if (thread->shutdownInProgress)
+						priority.finished();
+						
+						if (exception.getSqlcode() != DEVICE_FULL)
+							throw;
+						
+						database->setIOError(&exception);
+						
+						for (bool error = true; error;)
 							{
-							Bdb *next;
+							if (thread->shutdownInProgress)
+								{
+								Bdb *next;
 
-							for (bdb = bdbList; bdb; bdb = next)
+								for (bdb = bdbList; bdb; bdb = next)
+									{
+									//bdb->flags &= ~BDB_write_pending;
+									next = bdb->ioThreadNext;
+									bdb->syncWrite.unlock();
+									bdb->decrementUseCount(REL_HISTORY);
+									}
+									
+								return;
+								}
+							
+							thread->sleep(1000);
+							
+							try
 								{
-								//bdb->flags &= ~BDB_write_pending;
-								next = bdb->ioThreadNext;
-								bdb->syncWrite.unlock();
-								bdb->decrementUseCount(REL_HISTORY);
+								priority.schedule(PRIORITY_LOW);
+								dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
+								error = false;
+								database->clearIOError();
+								}
+							catch (SQLException& exception2)
+								{
+								priority.finished();
+								
+								if (exception2.getSqlcode() != DEVICE_FULL)
+									throw;
 								}
-
-							return;
-							}
-
-						thread->sleep(1000);
-
-						try
-							{
-							priority.schedule(PRIORITY_LOW);
-							dbb->writePages(pageNumber, length, buffer, WRITE_TYPE_FLUSH);
-							error = false;
-							database->clearIOError();
-							}
-						catch (SQLException& exception2)
-							{
-							priority.finished();
-
-							if (exception2.getSqlcode() != DEVICE_FULL)
-								throw;
 							}
 						}
-					}
 
-				priority.finished();
-				Bdb *next;
+					priority.finished();
+					Bdb *next;
 
-				for (bdb = bdbList; bdb; bdb = next)
-					{
-					//ASSERT(bdb->flags & BDB_write_pending);
-					//bdb->flags &= ~BDB_write_pending;
-					next = bdb->ioThreadNext;
-					bdb->syncWrite.unlock();
-					bdb->decrementUseCount(REL_HISTORY);
+					for (bdb = bdbList; bdb; bdb = next)
+						{
+						//ASSERT(bdb->flags & BDB_write_pending);
+						//bdb->flags &= ~BDB_write_pending;
+						next = bdb->ioThreadNext;
+						bdb->syncWrite.unlock();
+						bdb->decrementUseCount(REL_HISTORY);
+						}
+					
+					flushLock.lock(Exclusive);
+					++physicalWrites;
+					
+					break;
 					}
-
-				flushLock.lock(Exclusive);
-				++physicalWrites;
-
-				}
-			else
-				{
-				if (bdb)
-					bdb->decrementUseCount(REL_HISTORY);
-				}
-
- 			if (!hit)
+			
+			if (!hit)
 				{
+				sync.unlock();
 				flushBitmap->clear(pageNumber);
 				}
 			}
-		else
+		else 
 			{
 			if (flushing)
 				{
 				int writes = physicalWrites;
 				int pages = flushPages;
 				int delta = (int) (database->timestamp - flushStart);
-				int64 callbackArg = flushArg;
 				flushing = false;
-				flushArg = 0;
 				flushLock.unlock();
 				syncWait.unlock();
-
+				
 				if (writes > 0 && Log::isActive(LogInfo))
 					Log::log(LogInfo, "%d: Cache flush: %d pages, %d writes in %d seconds (%d pps)\n",
-							 database->deltaTime, pages, writes, delta, pages / MAX(delta, 1));
+								database->deltaTime, pages, writes, delta, pages / MAX(delta, 1));
 
-				if (callbackArg != 0)
-					database->pageCacheFlushed(callbackArg);
+				database->pageCacheFlushed(flushArg);
 				}
 			else
 				flushLock.unlock();
-
+			
 			if (thread->shutdownInProgress)
 				break;
 
 			thread->sleep();
 			flushLock.lock(Exclusive);
 			}
-		} // for ever
-
-	delete [] rawBuffer;
+		}
+	
+	delete [] rawBuffer;			
 }
 
 bool Cache::continueWrite(Bdb* startingBdb)
@@ -1118,26 +955,23 @@ bool Cache::continueWrite(Bdb* startingB
 	Dbb *dbb = startingBdb->dbb;
 	int clean = 1;
 	int dirty = 0;
-
+	
 	for (int32 pageNumber = startingBdb->pageNumber + 1, end = pageNumber+ 5; pageNumber < end; ++pageNumber)
 		{
-		Bdb *bdb;
-
+		Bdb *bdb = findBdb(dbb, pageNumber);
+		
 		if (dirty > clean)
 			return true;
-
-		bdb = lockFindBdbIncrementUseCount(dbb, pageNumber);
+			
 		if (!bdb)
 			return dirty >= clean;
-
+		
 		if (bdb->isDirty)
 			++dirty;
 		else
 			++clean;
-
-		bdb->decrementUseCount(REL_HISTORY);
 		}
-
+	
 	return (dirty >= clean);
 }
 
@@ -1164,97 +998,77 @@ void Cache::shutdownThreads(void)
 		ioThreads[n]->shutdown();
 		ioThreads[n] = 0;
 		}
-
-	Sync lockThreads(&syncThreads, "Cache::shutdownThreads");
-	lockThreads.lock(Exclusive);
+	
+	Sync sync(&syncThreads, "Cache::shutdownThreads");
+	sync.lock(Exclusive);
 }
 
-#ifdef CACHE_TRACE_FILE
 void Cache::analyzeFlush(void)
 {
 	Dbb *dbb = NULL;
 	Bdb *bdb;
-	Sync dirtyLock (&syncDirty, "Cache::hasDirtyPages");
-
-	dirtyLock.lock (Shared);
+	
 	for (bdb = firstDirty; bdb; bdb = bdb->nextDirty)
 		if (bdb->dbb->tableSpaceId == 1)
 			{
 			dbb = bdb->dbb;
-
+			
 			break;
 			}
-	dirtyLock.unlock();
-
+	
 	if (!dbb)
 		return;
-
+	
 	fprintf(traceFile, "-------- time %d -------\n", database->deltaTime);
 
 	for (int pageNumber = 0; (pageNumber = flushBitmap->nextSet(pageNumber)) >= 0;)
-		// non-protected access to hash table via findBdb()!
 		if ( (bdb = findBdb(dbb, pageNumber)) )
 			{
 			int start = pageNumber;
 			int type = bdb->buffer->pageType;
-
-			// non-protected access to hash table via findBdb()!
+			
 			for (; (bdb = findBdb(dbb, ++pageNumber)) && bdb->flushIt;)
 				;
-
+			
 			fprintf(traceFile, " %d flushed: %d to %d, first type %d\n", pageNumber - start, start, pageNumber - 1, type);
-
-			// non-protected access to hash table via findBdb()!
+			
 			for (int max = pageNumber + 5; pageNumber < max && (bdb = findBdb(dbb, pageNumber)) && !bdb->flushIt; ++pageNumber)
 				{
 				if (bdb->isDirty)
 					fprintf(traceFile, "     %d dirty not flushed, type %d \n", pageNumber, bdb->buffer->pageType);
 				else
-					fprintf(traceFile, "     %d not dirty, type %d\n", pageNumber, bdb->buffer->pageType);
+					fprintf(traceFile,"      %d not dirty, type %d\n", pageNumber, bdb->buffer->pageType);
 				}
 			}
 		else
 			++pageNumber;
-
-	fflush(traceFile);
+	
+	fflush(traceFile);			
 }
 
 void Cache::openTraceFile(void)
 {
+#ifdef TRACE_FILE
 	if (traceFile)
 		closeTraceFile();
-
-	traceFile = fopen(TRACE_FILE, "a+");
-	fprintf(traceFile, "Starting\n");
-//KEL
-//	setvbuf(traceFile, (char *) NULL, _IOLBF, 0);
-
+		
+	traceFile = fopen(TRACE_FILE, "w");
+#endif
 }
 
 void Cache::closeTraceFile(void)
 {
+#ifdef TRACE_FILE
 	if (traceFile)
 		{
 		fclose(traceFile);
 		traceFile = NULL;
 		}
+#endif
 }
-#else // CACHE_TRACE_FILE
-void Cache::analyzeFlush(void)
-{
-}
-
-void Cache::openTraceFile(void)
-{
-}
-
-void Cache::closeTraceFile(void)
-{
-}
-#endif // CACHE_TRACE_FILE
 
 void Cache::flushWait(void)
 {
-	Sync waitLock(&syncWait, "Cache::flushWait");
-	waitLock.lock(Exclusive);
+	Sync sync(&syncWait, "Cache::flushWait");
+	sync.lock(Shared);
 }

=== modified file 'storage/falcon/Cache.h'
--- a/storage/falcon/Cache.h	2008-09-02 16:08:11 +0000
+++ b/storage/falcon/Cache.h	2008-10-02 22:15:11 +0000
@@ -28,17 +28,6 @@
 #include "SyncObject.h"
 #include "Queue.h"
 
-// uncomment DEBUG_SYNC_HASH_TABLE_SIZE to cause more contention and test for race conditions
-//#define DEBUG_SYNC_HASH_TABLE_SIZE (0x01 << 1)
-#ifdef DEBUG_SYNC_HASH_TABLE_SIZE
-#  define DEBUG_SYNC_HASH_TABLE_MASK (DEBUG_SYNC_HASH_TABLE_SIZE - 1)
-#  define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_pgnum) & DEBUG_SYNC_HASH_TABLE_MASK)
-#else /* DEBUG_SYNC_HASH_TABLE_SIZE */
-#  define PAGENUM_2_LOCK_INDEX(_pgnum, _slot) ((_slot))
-#endif /* DEBUG_SYNC_HASH_TABLE_SIZE */
-
-#define PAGENUM_2_SLOT(_pgnum) ((_pgnum) & hashMask)
-
 class Bdb;
 class Dbb;
 class PageWriter;
@@ -65,6 +54,7 @@ public:
 	void	markClean (Bdb *bdb);
 	void	markDirty (Bdb *bdb);
 	void	validate();
+	void	moveToHead (Bdb *bdb);
 	void	flush(int64 arg);
 	void	validateCache(void);
 	void	syncFile(Dbb *dbb, const char *text);
@@ -93,20 +83,14 @@ public:
 	bool		flushing;
 
 protected:
-	void	moveToHead (Bdb *bdb);
-	void	moveToHeadAlreadyLocked (Bdb *bdb);
-	Bdb*		getFreeBuffer(void);
-	Bdb*		findBdb(Dbb* dbb, int32 pageNumber, int slot);
+	Bdb*		findBuffer (Dbb *dbb, int pageNumber, LockType lockType);
 	Bdb*		findBdb(Dbb* dbb, int32 pageNumber);
-	Bdb*		lockFindBdbIncrementUseCount(Dbb* dbb, int32 pageNumber);
-	Bdb*		lockFindBdbIncrementUseCount(int32 pageNumber, int slot);
 
 	int64		flushArg;
 	Bdb			*bdbs;
 	Bdb			*endBdbs;
 	Queue<Bdb>	bufferQueue;
 	Bdb			**hashTable;
-	SyncObject  *syncHashTable;
 	Bdb			*firstDirty;
 	Bdb			*lastDirty;
 	Bitmap		*flushBitmap;
@@ -121,13 +105,12 @@ protected:
 	int			flushPages;
 	int			physicalWrites;
 	int			hashSize;
-	unsigned int	hashMask;
 	int			pageSize;
-	unsigned int upperFraction;
+	int			upperFraction;
 	int			numberHunks;
+	int			numberDirtyPages;
 	int			numberIoThreads;
-	volatile uint64 bufferAge;
-
+	volatile int bufferAge;
 public:
 	void flushWait(void);
 };

=== modified file 'storage/falcon/Database.cpp'
--- a/storage/falcon/Database.cpp	2008-09-11 10:56:00 +0000
+++ b/storage/falcon/Database.cpp	2008-09-30 18:17:19 +0000
@@ -686,7 +686,6 @@ void Database::createDatabase(const char
 		deleteFilesOnExit = true;
 		throw;
 		}
-
 }
 
 void Database::openDatabase(const char * filename)
@@ -1455,50 +1454,61 @@ void Database::dropTable(Table *table, T
 
 void Database::truncateTable(Table *table, Sequence *sequence, Transaction *transaction)
 {
-	Sync syncDDL(&syncSysDDL, "Database::truncateTable(1)");
-	syncDDL.lock(Exclusive);
-	
-	table->checkDrop();
-	
 	// Check for records in active transactions
 
 	if (hasUncommittedRecords(table, transaction))
 		throw SQLError(UNCOMMITTED_UPDATES, "table %s.%s has uncommitted updates and cannot be truncated",
 						table->schemaName, table->name);
-						   
-	// Block table drop/add, table list scans ok
-	
-	Sync syncTbl(&syncTables, "Database::truncateTable(2)");
-	syncTbl.lock(Shared);
+
+	// Lock SystemDDL first.  This lock can happen multiple times in many call stacks,
+	// both before and after the following locks.  So it is important that we get an 
+	// exclusive lock first.
+
+	Sync syncDDLLock(&syncSysDDL, "Database::truncateTable(SysDDL)");
+	syncDDLLock.lock(Exclusive);
+	
+	// Lock syncScavenge before locking syncSysDDL, syncTables, or table->syncObject.
+	// The scavenger locks syncScavenge  and then syncTables
+	// If we run out of record memory, forceRecordScavenge will eventually call table->syncObject.
+
+	Sync syncScavengeLock(&syncScavenge, "Database::truncateTable(scavenge)");
+	syncScavengeLock.lock(Exclusive);
+
+	table->checkDrop();
 	
+	// Block table drop/add, table list scans ok
+
+	Sync syncTablesLock(&syncTables, "Database::truncateTable(tables)");
+	syncTablesLock.lock(Shared);
+
 	//Lock sections (factored out of SRLDropTable to avoid a deadlock)
 	//The lock order (serialLog->syncSections before table->syncObject) is 
 	//important
 
-	Sync syncSections(&serialLog->syncSections, "Database::truncateTable(3)");
-	syncSections.lock(Exclusive);
-	
+	Sync syncSectionsLock(&serialLog->syncSections, "Database::truncateTable(sections)");
+	syncSectionsLock.lock(Exclusive);
+
 	// No table access until truncate completes
-	
-	Sync syncObj(&table->syncObject, "Database::truncateTable(4)");
-	syncObj.lock(Exclusive);
-	
+
+	Sync syncTableLock(&table->syncObject, "Database::truncateTable(table)");
+	syncTableLock.lock(Exclusive);
+
 	table->deleting = true;
-	
+
 	// Purge records out of committed transactions
-	
+
 	transactionManager->truncateTable(table, transaction);
-	
+
 	Transaction *sysTransaction = getSystemTransaction();
-	
+
 	// Recreate data/blob sections and indexes
-	
+
 	table->truncate(sysTransaction);
-	
+
 	commitSystemTransaction();
-	
+
 	// Delete and recreate the sequence
-	
+
 	if (sequence)
 		sequence = sequence->recreate();
 }

=== modified file 'storage/falcon/DeferredIndex.cpp'
--- a/storage/falcon/DeferredIndex.cpp	2008-09-10 19:51:03 +0000
+++ b/storage/falcon/DeferredIndex.cpp	2008-10-04 00:10:34 +0000
@@ -819,7 +819,7 @@ void DeferredIndex::scanIndex(IndexKey *
 void DeferredIndex::detachIndex(void)
 {
 	Sync sync(&syncObject, "DeferredIndex::detachIndex");
-	sync.lock(Shared);
+	sync.lock(Exclusive); // was Shared
 	index = NULL;
 }
 
@@ -884,11 +884,7 @@ void DeferredIndex::addRef()
 
 void DeferredIndex::releaseRef()
 {
-	ASSERT(useCount > 0);
-	
-	INTERLOCKED_DECREMENT(useCount);
-
-	if (useCount == 0)
+	if (INTERLOCKED_DECREMENT(useCount) == 0)
 		delete this;
 }
 

=== modified file 'storage/falcon/SRLUpdateIndex.cpp'
--- a/storage/falcon/SRLUpdateIndex.cpp	2008-07-15 18:57:27 +0000
+++ b/storage/falcon/SRLUpdateIndex.cpp	2008-10-04 00:10:34 +0000
@@ -40,14 +40,29 @@ SRLUpdateIndex::~SRLUpdateIndex(void)
 
 void SRLUpdateIndex::append(DeferredIndex* deferredIndex)
 {
+	uint indexId;
+	int idxVersion;
+	int tableSpaceId;
+
+	Sync syncDI(&deferredIndex->syncObject, "SRLUpdateIndex::append");
+	syncDI.lock(Shared);
+
+	if (!deferredIndex->index)
+		return;
+	else
+		{
+		indexId = deferredIndex->index->indexId;
+		idxVersion = deferredIndex->index->indexVersion;
+		tableSpaceId = deferredIndex->index->dbb->tableSpaceId;
+		}
+
+	syncDI.unlock();
+
 	Sync syncIndexes(&log->syncIndexes, "SRLUpdateIndex::append(1)");
 	syncIndexes.lock(Shared);
 
 	Transaction *transaction = deferredIndex->transaction;
 	DeferredIndexWalker walker(deferredIndex, NULL);
-	uint indexId = deferredIndex->index->indexId;
-	int idxVersion = deferredIndex->index->indexVersion;
-	int tableSpaceId = deferredIndex->index->dbb->tableSpaceId;
 	uint64 virtualOffset = 0;
 	uint64 virtualOffsetAtEnd = 0;
 

=== modified file 'storage/falcon/SerialLogWindow.cpp'
--- a/storage/falcon/SerialLogWindow.cpp	2008-03-11 16:15:47 +0000
+++ b/storage/falcon/SerialLogWindow.cpp	2008-10-02 09:01:39 +0000
@@ -117,7 +117,7 @@ SerialLogBlock* SerialLogWindow::readFir
 void SerialLogWindow::write(SerialLogBlock *block)
 {
 	uint32 length = ROUNDUP(block->length, sectorSize);
-	uint32 offset = (int) (origin + ((UCHAR*) block - buffer));
+	int64 offset = origin + ((UCHAR*) block - buffer);
 	ASSERT(length <= bufferLength);
 	
 	try

=== modified file 'storage/falcon/StorageHandler.cpp'
--- a/storage/falcon/StorageHandler.cpp	2008-09-11 10:56:00 +0000
+++ b/storage/falcon/StorageHandler.cpp	2008-10-01 03:13:44 +0000
@@ -61,11 +61,11 @@ static const char *createTempSpace = "up
 static const char *falconSchema [] = {
 	//"create tablespace " DEFAULT_TABLESPACE " filename '" FALCON_USER "' allocation 2000000000",
 	createTempSpace,
-	
+
 	"upgrade table falcon.tablespaces ("
 	"    name varchar(128) not null primary	key,"
 	"    pathname varchar(1024) not null)",
-	
+
 	"upgrade table falcon.tables ("
 	"    given_schema_name varchar(128) not null,"
 	"    effective_schema_name varchar(128) not null,"
@@ -73,11 +73,11 @@ static const char *falconSchema [] = {
 	"    effective_table_name varchar(128) not null,"
 	"    tablespace_name varchar(128) not null,"
 	"    pathname varchar(1024) not null primary key)",
-	
+
 	"upgrade unique index effective on falcon.tables (effective_schema_name, effective_table_name)",
 
 	NULL };
-				
+
 class Server;
 extern Server*	startServer(int port, const char *configFile);
 
@@ -95,13 +95,13 @@ static const char THIS_FILE[]=__FILE__;
 int init()
 {
 	const char *p;
-	
+
 	for (p = WHITE_SPACE; *p; p++)
 		charTable[(unsigned char)*p] = 1;
-	
+
 	for (p = PUNCTUATION_CHARS; *p; p++)
 		charTable[(unsigned char)*p] = 1;
-	
+
 	return 1;
 }
 
@@ -109,7 +109,7 @@ StorageHandler*	getFalconStorageHandler(
 {
 	if (!storageHandler)
 		storageHandler = new StorageHandler(lockSize);
-	
+
 	return storageHandler;
 }
 
@@ -151,7 +151,7 @@ StorageHandler::~StorageHandler(void)
 			storageDatabases[n] = storageDatabase->collision;
 			delete storageDatabase;
 			}
-	
+
 	for (int n = 0; n < tableHashSize; ++n)
 		for (StorageTableShare *table; (table = tables[n]);)
 			{
@@ -190,11 +190,11 @@ void StorageHandler::shutdownHandler(voi
 		dictionaryConnection->close();
 		dictionaryConnection = NULL;
 		}
-	
+
 	for (int n = 0; n < databaseHashSize; ++n)
 		for (StorageDatabase *storageDatabase = storageDatabases[n]; storageDatabase; storageDatabase = storageDatabase->collision)
 			storageDatabase->close();
-	
+
 	/***
 	Configuration configuration(NULL);
 	Connection *connection = new Connection(&configuration);
@@ -207,14 +207,14 @@ void StorageHandler::databaseDropped(Sto
 {
 	if (!storageDatabase && storageConnection)
 		storageDatabase = storageConnection->storageDatabase;
-		
+
 	if (storageDatabase)
 		{
 		Sync syncHash(&hashSyncObject, "StorageHandler::databaseDropped(1)");
 		int slot = JString::hash(storageDatabase->name, databaseHashSize);
 		syncHash.lock(Exclusive);
 		StorageDatabase **ptr;
-		
+
 		for (ptr = storageDatabases + slot; *ptr; ptr = &(*ptr)->collision)
 			if (*ptr == storageDatabase)
 				{
@@ -240,7 +240,7 @@ void StorageHandler::databaseDropped(Sto
 		for (StorageConnection *cnct = connections[n]; cnct; cnct = cnct->collision)
 			if (cnct != storageConnection)
 				cnct->databaseDropped(storageDatabase);
-			
+
 	sync.unlock();
 }
 
@@ -256,8 +256,8 @@ int StorageHandler::startTransaction(THD
 	Sync sync(&syncObject, "StorageHandler::commit");
 	sync.lock(Shared);
 	int slot = HASH(mySqlThread, connectionHashSize);
-	
-	for (StorageConnection *storageConnection = connections[slot]; 
+
+	for (StorageConnection *storageConnection = connections[slot];
 		 storageConnection; storageConnection = storageConnection->collision)
 		{
 		if (storageConnection->mySqlThread == mySqlThread)
@@ -276,16 +276,16 @@ int StorageHandler::commit(THD* mySqlThr
 	Sync sync(&syncObject, "StorageHandler::commit");
 	sync.lock(Shared);
 	int slot = HASH(mySqlThread, connectionHashSize);
-	
+
 	for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
 		if (connection->mySqlThread == mySqlThread)
 			{
 			int ret =connection->commit();
-			
+
 			if (ret)
 				return ret;
 			}
-	
+
 	return 0;
 }
 
@@ -294,16 +294,16 @@ int StorageHandler::prepare(THD* mySqlTh
 	Sync sync(&syncObject, "StorageHandler::prepare");
 	sync.lock(Shared);
 	int slot = HASH(mySqlThread, connectionHashSize);
-	
+
 	for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
 		if (connection->mySqlThread == mySqlThread)
 			{
 			int ret = connection->prepare(xidSize, xid);
-			
+
 			if (ret)
 				return ret;
 			}
-	
+
 	return 0;
 }
 
@@ -312,16 +312,16 @@ int StorageHandler::rollback(THD* mySqlT
 	Sync sync(&syncObject, "StorageHandler::rollback");
 	sync.lock(Shared);
 	int slot = HASH(mySqlThread, connectionHashSize);
-	
+
 	for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
 		if (connection->mySqlThread == mySqlThread)
 			{
 			int ret = connection->rollback();
-			
+
 			if (ret)
 				return ret;
 			}
-	
+
 	return 0;
 }
 
@@ -330,11 +330,11 @@ int StorageHandler::releaseVerb(THD* myS
 	Sync sync(&syncObject, "StorageHandler::releaseVerb");
 	sync.lock(Shared);
 	int slot = HASH(mySqlThread, connectionHashSize);
-	
+
 	for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
 		if (connection->mySqlThread == mySqlThread)
 			connection->releaseVerb();
-	
+
 	return 0;
 }
 
@@ -343,11 +343,11 @@ int StorageHandler::rollbackVerb(THD* my
 	Sync sync(&syncObject, "StorageHandler::rollbackVerb");
 	sync.lock(Shared);
 	int slot = HASH(mySqlThread, connectionHashSize);
-	
+
 	for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
 		if (connection->mySqlThread == mySqlThread)
 			connection->rollbackVerb();
-	
+
 	return 0;
 }
 
@@ -357,7 +357,7 @@ int StorageHandler::savepointSet(THD* my
 	sync.lock(Shared);
 	int slot = HASH(mySqlThread, connectionHashSize);
 	StorageSavepoint *savepoints = NULL;
-	
+
 	for (StorageConnection *connection = connections[slot]; connection; connection = connection->collision)
 		if (connection->mySqlThread == mySqlThread)
 			{
@@ -367,9 +367,9 @@ int StorageHandler::savepointSet(THD* my
 			savepoint->storageConnection = connection;
 			savepoint->savepoint = connection->savepointSet();
 			}
-	
+
 	*((void**) savePoint) = savepoints;
-	
+
 	return 0;
 }
 
@@ -377,15 +377,15 @@ int StorageHandler::savepointRelease(THD
 {
 	Sync sync(&syncObject, "StorageHandler::savepointRelease");
 	sync.lock(Shared);
-	
-	for (StorageSavepoint *savepoints = *(StorageSavepoint**) savePoint, *savepoint; 
+
+	for (StorageSavepoint *savepoints = *(StorageSavepoint**) savePoint, *savepoint;
 		  (savepoint = savepoints);)
 		{
 		savepoint->storageConnection->savepointRelease(savepoint->savepoint);
 		savepoints = savepoint->next;
 		delete savepoint;
 		}
-		
+
 	*((void**) savePoint) = NULL;
 
 	return 0;
@@ -395,15 +395,15 @@ int StorageHandler::savepointRollback(TH
 {
 	Sync sync(&syncObject, "StorageHandler::savepointRollback");
 	sync.lock(Shared);
-	
-	for (StorageSavepoint *savepoints = *(StorageSavepoint**) savePoint, *savepoint; 
+
+	for (StorageSavepoint *savepoints = *(StorageSavepoint**) savePoint, *savepoint;
 		   (savepoint = savepoints);)
 		{
 		savepoint->storageConnection->savepointRollback(savepoint->savepoint);
 		savepoints = savepoint->next;
 		delete savepoint;
 		}
-	
+
 	*((void**) savePoint) = NULL;
 
 	return 0;
@@ -414,22 +414,22 @@ StorageDatabase* StorageHandler::getStor
 	Sync sync(&hashSyncObject, "StorageHandler::getStorageDatabase");
 	int slot = JString::hash(dbName, databaseHashSize);
 	StorageDatabase *storageDatabase;
-	
+
 	if (storageDatabases[slot])
 		{
 		sync.lock(Shared);
-		
+
 		if ( (storageDatabase = findDatabase(dbName)) )
 			return storageDatabase;
-			
+
 		sync.unlock();
 		}
-		
+
 	sync.lock(Exclusive);
 
 	if ( (storageDatabase = findDatabase(dbName)) )
 		return storageDatabase;
-	
+
 	storageDatabase = new StorageDatabase(this, dbName, path);
 	storageDatabase->load();
 	storageDatabase->collision = storageDatabases[slot];
@@ -437,7 +437,7 @@ StorageDatabase* StorageHandler::getStor
 	storageDatabase->addRef();
 	storageDatabase->next = databaseList;
 	databaseList = storageDatabase;
-	
+
 	return storageDatabase;
 }
 
@@ -446,7 +446,7 @@ void StorageHandler::closeDatabase(const
 	Sync sync(&hashSyncObject, "StorageHandler::closeDatabase");
 	int slot = JString::hash(path, databaseHashSize);
 	sync.lock(Exclusive);
-	
+
 	for (StorageDatabase *storageDatabase, **ptr = storageDatabases + slot; (storageDatabase = *ptr); ptr = &storageDatabase->collision)
 		if (storageDatabase->filename == path)
 			{
@@ -505,9 +505,7 @@ int StorageHandler::createTablespace(con
 		return StorageErrorTableSpaceExist;
 		}
 
-	JString tableSpace = JString::upcase(tableSpaceName);
-
-	TableSpaceManager *tableSpaceManager = 
+	TableSpaceManager *tableSpaceManager =
 		dictionaryConnection->database->tableSpaceManager;
 
 	if (!tableSpaceManager->waitForPendingDrop(filename, 10))
@@ -527,16 +525,16 @@ int StorageHandler::createTablespace(con
 		{
 		if (exception.getSqlcode() == TABLESPACE_EXIST_ERROR)
 			return StorageErrorTableSpaceExist;
-			
+
 		if (exception.getSqlcode() == TABLESPACE_NOT_EXIST_ERROR)
 			return StorageErrorTableSpaceNotExist;
 
 		if (exception.getSqlcode() == TABLESPACE_DATAFILE_EXIST_ERROR)
 			return StorageErrorTableSpaceDataFileExist;
-			
+
 		return StorageErrorTablesSpaceOperationFailed;
 		}
-	
+
 	return 0;
 }
 
@@ -554,7 +552,7 @@ int StorageHandler::deleteTablespace(con
 		{
 		return StorageErrorTablesSpaceOperationFailed;
 		}
-		
+
 	try
 		{
 		CmdGen gen;
@@ -568,16 +566,16 @@ int StorageHandler::deleteTablespace(con
 	catch (SQLException& exception)
 		{
 		int sqlCode = exception.getSqlcode();
-		
+
 		if (sqlCode == TABLESPACE_NOT_EXIST_ERROR)
 			return StorageErrorTableSpaceNotExist;
-			
+
 		if (sqlCode == TABLESPACE_NOT_EMPTY)
 			return StorageErrorTableNotEmpty;
-			
+
 		return StorageErrorTablesSpaceOperationFailed;
 		}
-	
+
 	return 0;
 }
 
@@ -592,26 +590,26 @@ StorageTableShare* StorageHandler::findT
 	if (tables[slot])
 		{
 		sync.lock(Shared);
-		
+
 		for (tableShare = tables[slot]; tableShare; tableShare = tableShare->collision)
 			if (tableShare->pathName == filename)
 				return tableShare;
-	
+
 		sync.unlock();
 		}
 
 	sync.lock(Exclusive);
-	
+
 	for (tableShare = tables[slot]; tableShare; tableShare = tableShare->collision)
 		if (tableShare->pathName == filename)
 			return tableShare;
-	
+
 	tableShare = new StorageTableShare(this, filename, NULL, mySqlLockSize, false);
 	tableShare->collision = tables[slot];
 	tables[slot] = tableShare;
-	
+
 	ASSERT(tableShare->collision != tableShare);
-	
+
 	return tableShare;
 }
 
@@ -632,7 +630,7 @@ StorageTableShare* StorageHandler::preDe
 		{
 		Sync sync(&hashSyncObject, "StorageHandler::preDeleteTable");
 		sync.lock(Shared);
-		
+
 		for (tableShare = tables[slot]; tableShare; tableShare = tableShare->collision)
 			if (tableShare->pathName == filename)
 				return tableShare;
@@ -643,14 +641,14 @@ StorageTableShare* StorageHandler::preDe
 		tableShare = new StorageTableShare(this, filename, NULL, mySqlLockSize, false);
 		JString path = tableShare->lookupPathName();
 		delete tableShare;
-		
+
 		if (path == pathname)
 			return findTable(pathname);
 		}
 	catch (...)
 		{
 		}
-	
+
 	return NULL;
 }
 
@@ -663,17 +661,17 @@ StorageTableShare* StorageHandler::creat
 		return NULL;
 
 	StorageTableShare *tableShare = new StorageTableShare(this, pathname, tableSpaceName, mySqlLockSize, tempTable);
-	
+
 	if (tableShare->tableExists())
 		{
 		delete tableShare;
-		
+
 		return NULL;
 		}
 
 	addTable(tableShare);
 	tableShare->registerTable();
-	
+
 	return tableShare;
 }
 
@@ -684,7 +682,7 @@ void StorageHandler::addTable(StorageTab
 	sync.lock(Exclusive);
 	table->collision = tables[slot];
 	tables[slot] = table;
-	
+
 	ASSERT(table->collision != table);
 }
 
@@ -693,7 +691,7 @@ void StorageHandler::removeTable(Storage
 	Sync sync(&hashSyncObject, "StorageHandler::removeTable");
 	sync.lock(Exclusive);
 	int slot = JString::hash(table->pathName, tableHashSize);
-	
+
 	for (StorageTableShare **ptr = tables + slot; *ptr; ptr = &(*ptr)->collision)
 		if (*ptr == table)
 			{
@@ -705,7 +703,7 @@ void StorageHandler::removeTable(Storage
 StorageConnection* StorageHandler::getStorageConnection(StorageTableShare* tableShare, THD* mySqlThread, int mySqlThdId, OpenOption createFlag)
 {
 	Sync sync(&syncObject, "StorageHandler::getStorageConnection");
-	
+
 	if (!defaultDatabase)
 		initialize();
 
@@ -727,10 +725,10 @@ StorageConnection* StorageHandler::getSt
 			if (storageConnection->mySqlThread == mySqlThread) // && storageConnection->storageDatabase == tableShare->storageDatabase)
 				{
 				storageConnection->addRef();
-				
+
 				if (!tableShare->storageDatabase)
 					tableShare->setDatabase(storageDatabase);
-					
+
 				return storageConnection;
 				}
 
@@ -743,16 +741,16 @@ StorageConnection* StorageHandler::getSt
 		if (storageConnection->mySqlThread == mySqlThread) // && storageConnection->storageDatabase == tableShare->storageDatabase)
 			{
 			storageConnection->addRef();
-				
+
 			if (!tableShare->storageDatabase)
 				tableShare->setDatabase(storageDatabase);
-					
+
 			return storageConnection;
 			}
-	
+
 	storageConnection = new StorageConnection(this, storageDatabase, mySqlThread, mySqlThdId);
 	bool success = false;
-	
+
 	if (createFlag != CreateDatabase) // && createFlag != OpenTemporaryDatabase)
 		try
 			{
@@ -763,15 +761,15 @@ StorageConnection* StorageHandler::getSt
 			{
 			//fprintf(stderr, "database open failed: %s\n", exception.getText());
 			storageConnection->setErrorText(exception.getText());
-			
+
 			if (createFlag == OpenDatabase)
 				{
 				delete storageConnection;
-				
+
 				return NULL;
 				}
 			}
-	
+
 	if (!success && createFlag != OpenDatabase)
 		try
 			{
@@ -780,29 +778,29 @@ StorageConnection* StorageHandler::getSt
 		catch (SQLException&)
 			{
 			delete storageConnection;
-			
+
 			return NULL;
 			}
-	
+
 	tableShare->setDatabase(storageDatabase);
 	storageConnection->collision = connections[slot];
 	connections[slot] = storageConnection;
-	
+
 	return storageConnection;
 }
 
 StorageDatabase* StorageHandler::findDatabase(const char* dbName)
 {
 	int slot = JString::hash(dbName, databaseHashSize);
-	
+
 	for (StorageDatabase *storageDatabase = storageDatabases[slot]; storageDatabase; storageDatabase = storageDatabase->collision)
 		if (storageDatabase->name == dbName)
 			{
 			storageDatabase->addRef();
-			
+
 			return storageDatabase;
 			}
-			
+
 	return NULL;
 }
 
@@ -820,7 +818,7 @@ void StorageHandler::changeMySqlThread(S
 void StorageHandler::removeConnection(StorageConnection* storageConnection)
 {
 	int slot = HASH(storageConnection->mySqlThread, connectionHashSize);
-	
+
 	for (StorageConnection **ptr = connections + slot; *ptr; ptr = &(*ptr)->collision)
 		if (*ptr == storageConnection)
 			{
@@ -838,7 +836,7 @@ int StorageHandler::closeConnections(THD
 	for (StorageConnection *storageConnection = connections[slot], *next; storageConnection; storageConnection = next)
 		{
 		next = storageConnection->collision;
-		
+
 		if (storageConnection->mySqlThread == thd)
 			{
 			sync.unlock();
@@ -847,7 +845,7 @@ int StorageHandler::closeConnections(THD
 
 			if (storageConnection->mySqlThread)
 				storageConnection->release();	// This is for thd->ha_data[falcon_hton->slot]
-			
+
 			storageConnection->release();	// This is for storageConn
 			}
 		}
@@ -861,25 +859,25 @@ int StorageHandler::dropDatabase(const c
 	char pathname[FILENAME_MAX];
 	const char *SEPARATOR = pathname;
 	char *q = pathname;
-	
+
 	for (const char *p = path; *p;)
 		{
 		char c = *p++;
-		
+
 		if (c == '/')
 			{
 			if (*p == 0)
 				break;
-				
+
 			SEPARATOR = q + 1;
 			}
-		
+
 		*q++ = c;
 		}
-	
+
 	*q = 0;
 	JString dbName = JString::upcase(SEPARATOR);
-	strcpy(q, StorageTableShare::getDefaultRoot());	
+	strcpy(q, StorageTableShare::getDefaultRoot());
 	StorageDatabase *storageDatabase = getStorageDatabase(dbName, pathname);
 	databaseDropped(storageDatabase, NULL);
 
@@ -893,7 +891,7 @@ int StorageHandler::dropDatabase(const c
 
 	storageDatabase->release();
 	***/
-	
+
 	return 0;
 }
 
@@ -901,7 +899,7 @@ void StorageHandler::getIOInfo(InfoTable
 {
 	Sync sync(&hashSyncObject, "StorageHandler::getIOInfo");
 	sync.lock(Shared);
-	
+
 	for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
 		storageDatabase->getIOInfo(infoTable);
 }
@@ -930,7 +928,7 @@ void StorageHandler::getTransactionInfo(
 {
 	Sync sync(&hashSyncObject, "StorageHandler::getTransactionInfo");
 	sync.lock(Shared);
-	
+
 	for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
 		storageDatabase->getTransactionInfo(infoTable);
 }
@@ -939,7 +937,7 @@ void StorageHandler::getSerialLogInfo(In
 {
 	Sync sync(&hashSyncObject, "StorageHandler::getSerialLogInfo");
 	sync.lock(Shared);
-	
+
 	for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
 		storageDatabase->getSerialLogInfo(infoTable);
 }
@@ -953,7 +951,7 @@ void StorageHandler::getTransactionSumma
 {
 	Sync sync(&hashSyncObject, "StorageHandler::getTransactionSummaryInfo");
 	sync.lock(Shared);
-	
+
 	for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
 		storageDatabase->getTransactionSummaryInfo(infoTable);
 }
@@ -962,7 +960,7 @@ void StorageHandler::getTableSpaceInfo(I
 {
 	Sync sync(&hashSyncObject, "StorageHandler::getTableSpaceInfo");
 	sync.lock(Shared);
-	
+
 	for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
 		storageDatabase->getTableSpaceInfo(infoTable);
 }
@@ -971,7 +969,7 @@ void StorageHandler::getTableSpaceFilesI
 {
 	Sync sync(&hashSyncObject, "StorageHandler::getTableSpaceFilesInfo");
 	sync.lock(Shared);
-	
+
 	for (StorageDatabase *storageDatabase = databaseList; storageDatabase; storageDatabase = storageDatabase->next)
 		storageDatabase->getTableSpaceFilesInfo(infoTable);
 }
@@ -980,16 +978,16 @@ void StorageHandler::initialize(void)
 {
 	if (initialized)
 		return;
-	
+
 	Sync sync(&syncObject, "StorageHandler::initialize");
 	sync.lock(Exclusive);
-	
+
 	if (initialized)
 		return;
-		
+
 	initialized = true;
 	defaultDatabase = getStorageDatabase(MASTER_NAME, MASTER_PATH);
-	
+
 	try
 		{
 		defaultDatabase->getOpenConnection();
@@ -1001,7 +999,7 @@ void StorageHandler::initialize(void)
 		{
 		int err = e.getSqlcode();
 
-		// If got one of following errors, just rethrow. No point in 
+		// If got one of following errors, just rethrow. No point in
 		// trying to create database.
 		if (err != OPEN_MASTER_ERROR)
 			throw;
@@ -1046,14 +1044,14 @@ void StorageHandler::createDatabase(void
 void StorageHandler::dropTempTables(void)
 {
 	Statement *statement = dictionaryConnection->createStatement();
-	
+
 	try
 		{
 		PStatement select = dictionaryConnection->prepareStatement(
 			"select schema,tablename from system.tables where tablespace='" TEMPORARY_TABLESPACE "'");
 		RSet resultSet = select->executeQuery();
 		bool hit = false;
-		
+
 		while (resultSet->next())
 			{
 			CmdGen gen;
@@ -1061,14 +1059,14 @@ void StorageHandler::dropTempTables(void
 			statement->executeUpdate(gen.getString());
 			hit = true;
 			}
-		
+
 		//if (hit)
 			//statement->executeUpdate(dropTempSpace);
 		}
 	catch(...)
 		{
 		}
-	
+
 	try
 		{
 		statement->executeUpdate(createTempSpace);
@@ -1077,7 +1075,7 @@ void StorageHandler::dropTempTables(void
 		{
 		Log::log("Can't create temporary tablespace: %s\n", exception.getText());
 		}
-	
+
 	statement->close();
 }
 
@@ -1118,7 +1116,7 @@ void StorageHandler::cleanFileName(const
 	char *q = filename;
 	char *end = filename + filenameLength - 1;
 	filename[0] = 0;
-	
+
 	for (const char *p = pathname; q < end && (c = *p++); prior = c)
 		if (c != SEPARATOR || c != prior)
 			*q++ = c;
@@ -1136,7 +1134,7 @@ void StorageHandler::getFalconVersionInf
 
 int StorageHandler::recoverGetNextLimbo(int xidLength, unsigned char* xid)
 	{
-	if (!defaultDatabase)	
+	if (!defaultDatabase)
 		initialize();
 
 	if (Connection* connection = dictionaryConnection)
@@ -1149,14 +1147,14 @@ const char* StorageHandler::normalizeNam
 {
 	char *q = buffer;
 	char *end = buffer + bufferSize - 1;
-	
+
 	for (const char *p = name; *p && q < end; ++p)
 		if (charTable[(unsigned char)*p])
 			return name;
 		else
 			*q++ = UPPER(*p);
-	
+
 	*q = 0;
-	
+
 	return buffer;
 }

=== modified file 'storage/falcon/StorageVersion.h'
--- a/storage/falcon/StorageVersion.h	2008-05-22 13:31:03 +0000
+++ b/storage/falcon/StorageVersion.h	2008-10-06 08:40:27 +0000
@@ -14,5 +14,5 @@
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
 
-#define FALCON_VERSION	"T1.2-5"
-#define FALCON_DATE		"22 May, 2008"
+#define FALCON_VERSION	"T1.2-6"
+#define FALCON_DATE		"06 October, 2008"

=== modified file 'storage/falcon/Table.cpp'
--- a/storage/falcon/Table.cpp	2008-09-08 11:51:19 +0000
+++ b/storage/falcon/Table.cpp	2008-10-03 23:56:24 +0000
@@ -2117,7 +2117,10 @@ void Table::garbageCollect(Record *leavi
 	if (!leaving && !staying)
 		return;
 
-	Sync syncPrior(getSyncPrior(leaving ? leaving : staying), "Table::garbageCollect");
+	Sync sync (&syncObject, "Table::garbageCollect(1)");
+	sync.lock(Shared);
+	
+	Sync syncPrior(getSyncPrior(leaving ? leaving : staying), "Table::garbageCollect(2)");
 	syncPrior.lock(Shared);
 	
 	// Clean up field indexes

=== modified file 'storage/falcon/Transaction.cpp'
--- a/storage/falcon/Transaction.cpp	2008-09-10 19:51:03 +0000
+++ b/storage/falcon/Transaction.cpp	2008-10-02 23:51:36 +0000
@@ -1076,14 +1076,10 @@ void Transaction::addRef()
 	INTERLOCKED_INCREMENT(useCount);
 }
 
-int Transaction::release()
+void Transaction::release()
 {
-	int count = INTERLOCKED_DECREMENT(useCount);
-
-	if (count == 0)
+	if (INTERLOCKED_DECREMENT(useCount) == 0)
 		delete this;
-
-	return count;
 }
 
 int Transaction::createSavepoint()
@@ -1317,7 +1313,7 @@ void Transaction::add(DeferredIndex* def
 	Sync sync(&syncDeferredIndexes, "Transaction::add");
 	sync.lock(Exclusive);
 
-	deferredIndex->addRef();
+//	deferredIndex->addRef(); // temporarily disabled for Bug#39711
 	deferredIndex->nextInTransaction = deferredIndexes;
 	deferredIndexes = deferredIndex;
 	deferredIndexCount++;

=== modified file 'storage/falcon/Transaction.h'
--- a/storage/falcon/Transaction.h	2008-09-10 04:02:07 +0000
+++ b/storage/falcon/Transaction.h	2008-10-02 23:20:47 +0000
@@ -99,7 +99,7 @@ public:
 	void		prepare(int xidLength, const UCHAR *xid);
 	void		rollback();
 	void		commit();
-	int			release();
+	void		release();
 	void		addRef();
 	void		waitForTransaction();
 	bool		waitForTransaction (TransId transId);

=== modified file 'storage/falcon/ha_falcon.cpp'
--- a/storage/falcon/ha_falcon.cpp	2008-09-16 17:58:49 +0000
+++ b/storage/falcon/ha_falcon.cpp	2008-10-03 05:15:40 +0000
@@ -2180,46 +2180,6 @@ int StorageInterface::check_if_supported
 			}
 		}
 		
-	// TODO for Add Index:
-	// 1. Check for supported ALTER combinations
-	// 2. Can error message be improved for non-null columns?
-	
-	if (alter_flags->is_set(HA_ADD_INDEX) || alter_flags->is_set(HA_ADD_UNIQUE_INDEX))
-		{
-		for (unsigned int n = 0; n < altered_table->s->keys; n++)
-			{
-			if (n != altered_table->s->primary_key)
-				{
-				KEY *key = altered_table->key_info + n;
-				KEY *tableEnd = table->key_info + table->s->keys;
-				KEY *tableKey;
-				
-				// Determine if this is a new index
-
-				for (tableKey = table->key_info; tableKey < tableEnd; tableKey++)
-					if (!strcmp(tableKey->name, key->name))
-						break;
-				
-				// Verify that each part is nullable
-				
-				if (tableKey >= tableEnd)
-					for (uint p = 0; p < key->key_parts; p++)
-						{
-						KEY_PART_INFO *keyPart = key->key_part + p;
-						if (keyPart && !keyPart->field->real_maybe_null())
-							{
-							DBUG_PRINT("info",("Online add index columns must be nullable"));
-							DBUG_RETURN(HA_ALTER_NOT_SUPPORTED);
-							}
-						}
-				}
-			}
-		}
-		
-	if (alter_flags->is_set(HA_DROP_INDEX) || alter_flags->is_set(HA_DROP_UNIQUE_INDEX))
-		{
-		}
-		
 	DBUG_RETURN(HA_ALTER_SUPPORTED_NO_LOCK);
 }
 

Thread
bzr commit into mysql-6.0 branch (kgeorge:2855) Georgi Kodinov7 Oct