Hi Libing,
Nice work! Patch is almost OK!
STATUS
------
Not approved!
REQUIRED CHANGES
----------------
RC1. I think:
block_num_max(init_nodes/TRANX_NODE_NUM +
init_nodes % TRANX_NODE_NUM > 1 ? 2 : 1),
should be:
block_num_max(init_nodes/TRANX_NODE_NUM +
(init_nodes % TRANX_NODE_NUM > 0 ? 2 : 1)),
And I'd also suggest to name 'block_num_max' as something like
'max_reserved_block_num' because this is not the maximum blocks that can
be allocated, this is the max number of blocks that will not be freed.
RC2. restore the values for last_node in allocate_node() function when
the call to allocate_block() failed.
REQUESTS
--------
n/a
SUGGESTIONS
-----------
S1. I'd suggest rename TRANX_NODE_NUM to BLOCK_TRANX_NODES
S2. combine the code in ActiveTranx::allocate_node to
TranxNodeAllocatoer::allocate_node, and remove the former.
S3. Please add some comments to explain how the allocator allocate and
free nodes/blocks.
Please also look below for more comments.
Li-Bing.Song@stripped wrote:
> #At file:///home/anders/work/bzrroot/mysql-5.1-rep-semisync/ based on
> revid:zhenxing.he@stripped
>
> 3127 Li-Bing.Song@stripped 2010-01-25
> BUG#50157 Assertion !active_tranxs_->is_tranx_end_pos(..) in
> ReplSemiSyncMaster::commitTrx
>
> The root cause of the crash is that a TranxNode is freed before it is used.
> A TranxNode is allocated and insertted into the active list each time
> when some log events are written into binlog file and is flushed.
> The memory for TranxNode is allocted with thd_alloc and will be freed
> after at the end of the statement. The after_commit/after_rollback callback
> was supposed to be call before the end of each statement and remove the node
> from
> the active list. However this assumption is not correct in all cases(e.g.
> CREATE
> TEMPORARY ... SELECT), and can cause the memory allocated for TranxNode be
> freed
> before it was removed from the active list. So The TranxNode pointer in the
> active
> list would become a wild pointer and cause the crash.
>
> After this patch, The memory for TranxNode will be allocated by my_malloc.
> @ sql/rpl_handler.cc
> params are not initialized.
>
> modified:
> mysql-test/suite/rpl/r/rpl_semi_sync.result
> mysql-test/suite/rpl/t/rpl_semi_sync.test
> plugin/semisync/semisync_master.cc
> plugin/semisync/semisync_master.h
> sql/rpl_handler.cc
> === modified file 'mysql-test/suite/rpl/r/rpl_semi_sync.result'
> --- a/mysql-test/suite/rpl/r/rpl_semi_sync.result 2009-10-23 04:56:30 +0000
> +++ b/mysql-test/suite/rpl/r/rpl_semi_sync.result 2010-01-25 09:04:10 +0000
> @@ -120,6 +120,25 @@ min(a)
> select max(a) from t1;
> max(a)
> 300
> +
> +# BUG#50157
> +# semi-sync replication crashes when replicating a transaction which
> +# include 'CREATE TEMPORARY TABLE `MyISAM_t` SELECT * FROM `Innodb_t` ;
> +[ on master ]
> +SET SESSION AUTOCOMMIT= 0;
> +CREATE TABLE t2(c1 INT) ENGINE=innodb;
> +BEGIN;
> +
> +# Even though it is in a transaction, this statement is binlogged into binlog
> +# file immediately.
> +CREATE TEMPORARY TABLE t3 SELECT c1 FROM t2 where 1=1;
> +
> +# These statements will not binlogged until the transaction is committed
> +INSERT INTO t2 VALUES(11);
> +INSERT INTO t2 VALUES(22);
> +COMMIT;
> +DROP TABLE t2, t3;
> +SET SESSION AUTOCOMMIT= 1;
> #
> # Test semi-sync master will switch OFF after one transacton
> # timeout waiting for slave reply.
> @@ -135,7 +154,7 @@ Variable_name Value
> Rpl_semi_sync_master_no_tx 0
> show status like 'Rpl_semi_sync_master_yes_tx';
> Variable_name Value
> -Rpl_semi_sync_master_yes_tx 301
> +Rpl_semi_sync_master_yes_tx 304
> show status like 'Rpl_semi_sync_master_clients';
> Variable_name Value
> Rpl_semi_sync_master_clients 1
> @@ -150,7 +169,7 @@ Variable_name Value
> Rpl_semi_sync_master_no_tx 1
> show status like 'Rpl_semi_sync_master_yes_tx';
> Variable_name Value
> -Rpl_semi_sync_master_yes_tx 301
> +Rpl_semi_sync_master_yes_tx 304
> insert into t1 values (100);
> [ master status should be OFF ]
> show status like 'Rpl_semi_sync_master_status';
> @@ -161,7 +180,7 @@ Variable_name Value
> Rpl_semi_sync_master_no_tx 302
> show status like 'Rpl_semi_sync_master_yes_tx';
> Variable_name Value
> -Rpl_semi_sync_master_yes_tx 301
> +Rpl_semi_sync_master_yes_tx 304
> #
> # Test semi-sync status on master will be ON again when slave catches up
> #
> @@ -194,7 +213,7 @@ Variable_name Value
> Rpl_semi_sync_master_no_tx 302
> show status like 'Rpl_semi_sync_master_yes_tx';
> Variable_name Value
> -Rpl_semi_sync_master_yes_tx 301
> +Rpl_semi_sync_master_yes_tx 304
> show status like 'Rpl_semi_sync_master_clients';
> Variable_name Value
> Rpl_semi_sync_master_clients 1
> @@ -213,7 +232,7 @@ Variable_name Value
> Rpl_semi_sync_master_no_tx 302
> SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
> Variable_name Value
> -Rpl_semi_sync_master_yes_tx 302
> +Rpl_semi_sync_master_yes_tx 305
> FLUSH NO_WRITE_TO_BINLOG STATUS;
> [ Semi-sync master status variables after FLUSH STATUS ]
> SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
>
> === modified file 'mysql-test/suite/rpl/t/rpl_semi_sync.test'
> --- a/mysql-test/suite/rpl/t/rpl_semi_sync.test 2009-10-23 04:56:30 +0000
> +++ b/mysql-test/suite/rpl/t/rpl_semi_sync.test 2010-01-25 09:04:10 +0000
> @@ -11,6 +11,8 @@ disable_query_log;
> connection master;
> call mtr.add_suppression("Timeout waiting for reply of binlog");
> call mtr.add_suppression("Read semi-sync reply");
> +call mtr.add_suppression("binlog write out-of-order");
> +call mtr.add_suppression("Semi-sync failed to insert tranx_node for binlog file");
I thik both these errors should not happen for the test case.
> connection slave;
> call mtr.add_suppression("Master server does not support semi-sync");
> call mtr.add_suppression("Semi-sync slave .* reply");
> @@ -193,6 +195,34 @@ select count(distinct a) from t1;
> select min(a) from t1;
> select max(a) from t1;
>
> +--echo
> +--echo # BUG#50157
> +--echo # semi-sync replication crashes when replicating a transaction which
> +--echo # include 'CREATE TEMPORARY TABLE `MyISAM_t` SELECT * FROM `Innodb_t` ;
> +
> +connection master;
> +echo [ on master ];
> +SET SESSION AUTOCOMMIT= 0;
> +CREATE TABLE t2(c1 INT) ENGINE=innodb;
> +sync_slave_with_master;
> +
> +connection master;
> +BEGIN;
> +--echo
> +--echo # Even though it is in a transaction, this statement is binlogged into
> binlog
> +--echo # file immediately.
> +CREATE TEMPORARY TABLE t3 SELECT c1 FROM t2 where 1=1;
> +--echo
> +--echo # These statements will not binlogged until the transaction is committed
binlogged => be binlogged
> +INSERT INTO t2 VALUES(11);
> +INSERT INTO t2 VALUES(22);
> +COMMIT;
> +
> +DROP TABLE t2, t3;
> +SET SESSION AUTOCOMMIT= 1;
> +sync_slave_with_master;
> +
> +
> --echo #
> --echo # Test semi-sync master will switch OFF after one transacton
typo: transaction
> --echo # timeout waiting for slave reply.
>
> === modified file 'plugin/semisync/semisync_master.cc'
> --- a/plugin/semisync/semisync_master.cc 2009-12-04 05:43:38 +0000
> +++ b/plugin/semisync/semisync_master.cc 2010-01-25 09:04:10 +0000
> @@ -65,7 +65,7 @@ static int gettimeofday(struct timeval *
>
> ActiveTranx::ActiveTranx(pthread_mutex_t *lock,
> unsigned long trace_level)
> - : Trace(trace_level),
> + : Trace(trace_level), allocator_(max_connections),
> num_entries_(max_connections << 1), /* Transaction hash table size
> * is set to double the size
> * of max_connections */
> @@ -115,15 +115,9 @@ unsigned int ActiveTranx::get_hash_value
> return (hash1 + hash2) % num_entries_;
> }
>
> -ActiveTranx::TranxNode* ActiveTranx::alloc_tranx_node()
> +TranxNode* ActiveTranx::alloc_tranx_node()
> {
> - MYSQL_THD thd= (MYSQL_THD)current_thd;
> - /* The memory allocated for TranxNode will be automatically freed at
> - the end of the command of current THD. And because
> - ha_autocommit_or_rollback() will always be called before that, so
> - we are sure that the node will be removed from the active list
> - before it get freed. */
> - TranxNode *trx_node = (TranxNode *)thd_alloc(thd, sizeof(TranxNode));
> + TranxNode *trx_node = allocator_.allocate_node();
> if (trx_node)
> {
> trx_node->log_name_[0] = '\0';
> @@ -271,6 +265,7 @@ int ActiveTranx::clear_active_tranx_node
>
> /* Clear the hash table. */
> memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
> + allocator_.free_all_nodes();
>
> /* Clear the active transaction list. */
> if (trx_front_ != NULL)
> @@ -311,6 +306,7 @@ int ActiveTranx::clear_active_tranx_node
> }
>
> trx_front_ = new_front;
> + allocator_.free_nodes_before(trx_front_);
>
> if (trace_level_ & kTraceDetail)
> sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
>
> === modified file 'plugin/semisync/semisync_master.h'
> --- a/plugin/semisync/semisync_master.h 2009-12-04 01:46:33 +0000
> +++ b/plugin/semisync/semisync_master.h 2010-01-25 09:04:10 +0000
> @@ -20,6 +20,148 @@
>
> #include "semisync.h"
>
> +struct TranxNode {
> + char log_name_[FN_REFLEN];
> + my_off_t log_pos_;
> + struct TranxNode *next_; /* the next node in the sorted list */
> + struct TranxNode *hash_next_; /* the next node during hash collision */
> +};
> +
> +#define TRANX_NODE_NUM 16
> +class TranxNodeAllocator
> +{
> +public:
> + /*
> + init_nodes: How many TranxNodes are usually need.
> + */
> + TranxNodeAllocator(uint init_nodes) :
> + block_num_max(init_nodes/TRANX_NODE_NUM +
> + (init_nodes % TRANX_NODE_NUM) > 1 ? 2 : 1),
> + last_node(-1), first_block(NULL), last_block(NULL),
> + current_block(NULL), block_num(0) {}
> +
> + ~TranxNodeAllocator()
> + {
> + Block *block= first_block;
> + while (block != NULL)
> + {
> + Block *next= block->next;
> + free_block(block);
> + block= next;
> + }
> + }
> +
> + /*
> + Return a TranxNode pointer which follows the last_node immediately.
> + */
> + TranxNode *allocate_node()
> + {
> + if (last_node == TRANX_NODE_NUM-1)
> + {
> + last_node= -1;
> + current_block= current_block->next;
> + }
> +
> + if (current_block == NULL && allocate_block())
> + return NULL;
should restore the values for last_node when allocate_block() failed.
> +
> + return &(current_block->nodes[++last_node]);
> + }
> +
> + int free_all_nodes()
> + {
> + current_block= first_block;
> + last_node= -1;
> + free_blocks();
> + return 0;
> + }
> +
> + int free_nodes_before(TranxNode* node)
> + {
> + Block *block;
> + Block *prev_block;
> +
> + block= first_block;
> + while (block != current_block->next)
> + {
> + if (&(block->nodes[0]) <= node &&
> &(block->nodes[TRANX_NODE_NUM]) >= node)
> + {
> + if (first_block != block)
> + {
> + last_block->next= first_block;
> + first_block= block;
> + last_block= prev_block;
> + last_block->next= NULL;
> + free_blocks();
> + }
> + return 0;
> + }
> + prev_block= block;
> + block= block->next;
> + }
this should never happen (node not found), add an assertion here.
> + return 1;
> + }
> +
> +private:
> + uint block_num_max;
> + int last_node;
> +
> + struct Block {
> + Block *next;
> + TranxNode nodes[TRANX_NODE_NUM];
> + };
> + Block *first_block;
> + Block *last_block;
> + Block *current_block;
> + uint block_num;
> +
> + int allocate_block()
> + {
> + Block *block= (Block *)my_malloc(sizeof(Block), MYF(0));
> + if (block)
> + {
> + block->next= NULL;
> +
> + if (first_block == NULL)
> + first_block= block;
> + else
> + last_block->next= block;
> +
> + /* New block always is put at the rear */
> + last_block= block;
> + /* New block always is the current_block */
> + current_block= block;
> + ++block_num;
> + return 0;
> + }
> + return 1;
> + }
> +
> + void free_block(Block *block)
> + {
> + my_free(block, MYF(0));
> + --block_num;
> + }
> +
> + void free_blocks()
> + {
> + if (current_block == NULL)
> + return;
> +
> + Block *block= current_block->next;
> + while (block_num > block_num_max && block != NULL)
> + {
> + Block *next= block->next;
> + free_block(block);
> + block= next;
> + }
> + current_block->next= block;
> + if (block == NULL)
> + last_block= current_block;
> + }
> +};
> +
> +
> /**
> This class manages memory for active transaction list.
>
> @@ -31,13 +173,8 @@
> class ActiveTranx
> :public Trace {
> private:
> - struct TranxNode {
> - char log_name_[FN_REFLEN];
> - my_off_t log_pos_;
> - struct TranxNode *next_; /* the next node in the sorted list */
> - struct TranxNode *hash_next_; /* the next node during hash collision */
> - };
>
> + TranxNodeAllocator allocator_;
> /* These two record the active transaction list in sort order. */
> TranxNode *trx_front_, *trx_rear_;
>
> @@ -54,18 +191,18 @@ private:
> unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos);
>
> int compare(const char *log_file_name1, my_off_t log_file_pos1,
> - const TranxNode *node2) {
> + const TranxNode *node2) {
> return compare(log_file_name1, log_file_pos1,
> - node2->log_name_, node2->log_pos_);
> + node2->log_name_, node2->log_pos_);
> }
> int compare(const TranxNode *node1,
> - const char *log_file_name2, my_off_t log_file_pos2) {
> + const char *log_file_name2, my_off_t log_file_pos2) {
> return compare(node1->log_name_, node1->log_pos_,
> - log_file_name2, log_file_pos2);
> + log_file_name2, log_file_pos2);
> }
> int compare(const TranxNode *node1, const TranxNode *node2) {
> return compare(node1->log_name_, node1->log_pos_,
> - node2->log_name_, node2->log_pos_);
> + node2->log_name_, node2->log_pos_);
> }
>
> public:
> @@ -88,7 +225,7 @@ public:
> * 0: success; non-zero: error
> */
> int clear_active_tranx_nodes(const char *log_file_name,
> - my_off_t log_file_pos);
> + my_off_t log_file_pos);
>
> /* Given a position, check to see whether the position is an active
> * transaction's ending position by probing the hash table.
> @@ -99,7 +236,7 @@ public:
> * (file_name, file_position).
> */
> static int compare(const char *log_file_name1, my_off_t log_file_pos1,
> - const char *log_file_name2, my_off_t log_file_pos2);
> + const char *log_file_name2, my_off_t log_file_pos2);
>
> };
>
>
> === modified file 'sql/rpl_handler.cc'
> --- a/sql/rpl_handler.cc 2009-12-04 01:46:33 +0000
> +++ b/sql/rpl_handler.cc 2010-01-25 09:04:10 +0000
> @@ -190,8 +190,8 @@ int Trans_delegate::after_commit(THD *th
> {
> Trans_param param;
> bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
> - if (is_real_trans)
> - param.flags |= TRANS_IS_REAL_TRANS;
> +
> + param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
>
> Trans_binlog_info *log_info=
> my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
> @@ -218,8 +218,8 @@ int Trans_delegate::after_rollback(THD *
> {
> Trans_param param;
> bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
> - if (is_real_trans)
> - param.flags |= TRANS_IS_REAL_TRANS;
> +
> + param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
>
> Trans_binlog_info *log_info=
> my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
> @@ -228,7 +228,7 @@ int Trans_delegate::after_rollback(THD *
> param.log_pos= log_info ? log_info->log_pos : 0;
>
> int ret= 0;
> - FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
> + FOREACH_OBSERVER(ret, after_rollback, thd, (¶m));
>
> /*
> This is the end of a real transaction or autocommit statement, we
> @@ -249,6 +249,7 @@ int Binlog_storage_delegate::after_flush
> {
> Binlog_storage_param param;
> uint32 flags=0;
> +
> if (synced)
> flags |= BINLOG_STORAGE_IS_SYNCED;
>
>