Hi!
Here is high level functions of proposed method:
/**
@brief Flush the log up to given LSN (included)
@param lsn log record serial number up to which (inclusive)
the log has to be flushed
@return Operation status
@retval 0 OK
@retval 1 Error
@todo: remove serialization and make group commit.
*/
my_bool translog_flush(TRANSLOG_ADDRESS lsn)
{
my_bool rc= 0;
my_bool need_second_pass;
DBUG_ENTER("translog_flush");
DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
LINT_INIT(need_second_pass);
translog_lock();
retry:
if (translog_is_already_flushed(lsn))
{
DBUG_PRINT("info", ("The LSN is already flushed"));
translog_unlock();
DBUG_RETURN(0);
}
switch (log_descriptor.flush_status){
case FLUSHSTATUS_FREE:
translog_set_status(FLUSHSTATUS_LEADER_DETECT);
translog_set_current_flush_goals(lsn); /* unlock handler */
rc= translog_flush_as_master();
break;
case FLUSHSTATUS_LEADER_DETECT:
translog_waitfor_status_change(FLUSHSTATUS_FLUSH_PASS);
/* fall through */
case FLUSHSTATUS_FLUSH_PASS:
translog_register_next_flush_pass_needs(&need_second_pass, lsn);
if ((rc= translog_flush_pass())) /* unlock handler */
break;
translog_lock();
rc= translog_next_pass_preparations(need_second_pass); /* unlock handler
*/
break;
case FLUSHSTATUS_FLUSH_FINISHING:
translog_register_next_flush_pass_needs(&need_second_pass, lsn);
rc= translog_next_pass_preparations(need_second_pass); /* unlock handler
*/
break;
case FLUSHSTATUS_FLUSH_END:
case FLUSHSTATUS_NEW_LEADER:
translog_waitfor_status_change(FLUSHSTATUS_LEADER_DETECT |
FLUSHSTATUS_FREE);
goto retry;
default:
DBUG_ASSERT(0);
}
}
/**
@brief Master thread flush pass
@return Operation status
@retval 0 OK
@retval 1 Error
*/
static my_bool translog_flush_as_master()
{
my_bool rc= 0;
DBUG_ENTER("translog_flush_as_master");
translog_lock();
translog_set_status(FLUSHSTATUS_FLUSH_PASS);
if (translog_flush_pass()) /* unlock handler */
DBUG_RETURN(1);
translog_lock();
translog_waitfor_status_change(FLUSHSTATUS_FLUSH_END);
if (translog_sync_flushed_files())
DBUG_RETURN(1);
translog_clear_current_pass_goals();
if (translog_is_next_pass_goals())
translog_set_status(FLUSHSTATUS_NEW_LEADER);
else
translog_set_status(FLUSHSTATUS_FREE);
DBUG_RETURN(0);
}
/**
@brief Wait and prepare for next flush pass for threads who registered
their flush needs for next pass
@param need_second_pass TRUE if we really need second pass
@return Operation status
@retval 0 OK
@retval 1 Error
*/
static my_bool translog_next_pass_preparations(my_bool need_second_pass)
{
my_bool rc= 0;
DBUG_ENTER("translog_next_pass_preparations");
if (!need_second_pass)
{
translog_waitfor_status_change(FLUSHSTATUS_NEW_LEADER |
FLUSHSTATUS_FREE);
translog_unlock();
}
else
{
translog_waitfor_status_change(FLUSHSTATUS_NEW_LEADER);
translog_unlock();
if (pthread_equal(log_descriptor.next_lead, pthread_self()))
{
translog_set_status(FLUSHSTATUS_LEADER_DETECT);
translog_unlock();
translog_copy_current_flush_pass_goals();
rc= translog_flush_as_master();
}
else
{
translog_waitfor_status_change(FLUSHSTATUS_FLUSH_PASS);
if (!(rc= translog_flush_pass())) /* unlock handler */
{
translog_lock();
translog_waitfor_status_change(FLUSHSTATUS_NEW_LEADER |
FLUSHSTATUS_FREE);
translog_unlock();
}
}
}
DBUG_RETURN(rc);
}