Hi!
>>>>> "sanja" == sanja <sanja@stripped> writes:
<cut>
sanja> === modified file 'storage/maria/ma_loghandler.c'
sanja> --- a/storage/maria/ma_loghandler.c 2008-05-06 23:29:12 +0000
sanja> +++ b/storage/maria/ma_loghandler.c 2008-05-22 21:53:17 +0000
sanja> @@ -99,7 +99,13 @@
sanja> pagecache_inject()
sanja> */
sanja> uchar buffer[TRANSLOG_WRITE_BUFFER];
sanja> + /*
sanja> + Maximum LSN of records which ends in this buffer (or IMPOSSIBLE_LSN
sanja> + if no LSNs ends here)
sanja> + */
sanja> LSN last_lsn;
sanja> + /* last_lsn of previous buffer of IMPOSSIBLE_LSN if it is very first one */
of -> or
sanja> + LSN prev_last_lsn;
<cut>
sanja> @@ -244,6 +250,10 @@
sanja> File directory_fd;
sanja> /* buffers for log writing */
sanja> struct st_translog_buffer buffers[TRANSLOG_BUFFERS_NO];
sanja> + /* Mask where 1 in position N mean that buffer N is not flushed */
sanja> + uint dirty_buffer_mask;
How many buffers do we need for max ?
(Size of uint is not well defined)
<cut>
sanja> @@ -1372,7 +1388,9 @@
sanja> static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
sanja> {
sanja> DBUG_ENTER("translog_buffer_init");
sanja> - buffer->last_lsn= LSN_IMPOSSIBLE;
sanja> + buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
sanja> + DBUG_PRINT("info", ("last_lsn & prev_last_lsn set to 0, buffer: 0x%lx",
sanja> + (ulong) buffer));
Remove space before &
sanja> /* This Buffer File */
sanja> buffer->file= NULL;
sanja> buffer->overlay= 0;
sanja> @@ -1970,7 +1988,9 @@
sanja> (ulong) LSN_OFFSET(log_descriptor.horizon),
sanja> (ulong) LSN_OFFSET(log_descriptor.horizon)));
sanja> DBUG_ASSERT(buffer_no == buffer->buffer_no);
sanja> - buffer->last_lsn= LSN_IMPOSSIBLE;
sanja> + buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
sanja> + DBUG_PRINT("info", ("last_lsn & prev_last_lsn set to 0, buffer: 0x%lx",
sanja> + (ulong) buffer));
Remove space before &
sanja> @@ -2044,7 +2068,6 @@
sanja> if (new_file)
sanja> {
sanja> -
sanja> /* move the horizon to the next file and its header page */
sanja> (*horizon)+= LSN_ONE_FILE;
sanja> (*horizon)= LSN_REPLACE_OFFSET(*horizon, TRANSLOG_PAGE_SIZE);
sanja> @@ -2063,6 +2086,13 @@
sanja> translog_start_buffer(new_buffer, cursor, new_buffer_no);
sanja> }
sanja> log_descriptor.buffers[old_buffer_no].next_buffer_offset=
> new_buffer->offset;
sanja> + new_buffer->prev_last_lsn=
sanja> + ((log_descriptor.buffers[old_buffer_no].last_lsn != LSN_IMPOSSIBLE) ?
sanja> + log_descriptor.buffers[old_buffer_no].last_lsn :
sanja> + log_descriptor.buffers[old_buffer_no].prev_last_lsn);
You could introduce a new variable 'old_buffer' that is set to
log_descriptor.buffers[old_buffer_no]. This would make the above lines
a bit shorter and easier to read.
sanja> + DBUG_PRINT("info", ("prev_last_lsn set to (%lu,0x%lx), buffer: 0x%lx",
Please change , before buffer to space
sanja> @@ -2390,7 +2421,6 @@
sanja> TRANSLOG_FILE *file= buffer->file;
sanja> uint8 ver= buffer->ver;
sanja> DBUG_ENTER("translog_buffer_flush");
sanja> - DBUG_ASSERT(buffer->file != NULL);
sanja> DBUG_PRINT("enter",
sanja> ("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu",
sanja> (uint) buffer->buffer_no, (ulong) buffer,
sanja> @@ -2399,6 +2429,9 @@
sanja> (ulong) buffer->size));
sanja> translog_buffer_lock_assert_owner(buffer);
sanja> + if (buffer->file == NULL)
sanja> + goto out;
sanja> +
Why do you need to unmark a buffer with file == NULL ?
Who could have marked this for flush ?
At the time file was set to NULL, wasn't the file already unmarked ?
If this is correct, please add a comment that answers the above.
It would be nice to get rid of the above becasue:
- It will be common that someone else flushes what you are going to
look at.
- Taking an extra mutex + signal when not needed is expensive
<cut>
sanja> +out:
sanja> + pthread_mutex_lock(&log_descriptor.dirty_buffer_mask_lock);
sanja> + log_descriptor.dirty_buffer_mask&= ~(1 << buffer->buffer_no);
sanja> + pthread_mutex_unlock(&log_descriptor.dirty_buffer_mask_lock);
sanja> pthread_cond_broadcast(&buffer->waiting_filling_buffer);
sanja> DBUG_RETURN(0);
sanja> }
<cut>
sanja> @@ -5559,6 +5620,10 @@
sanja> */
sanja> translog_lock();
sanja> set_lsn(lsn, horizon);
sanja> + buffer_of_last_lsn->last_lsn= *lsn;
sanja> + DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx), buffer: 0x%lx",
Change "," before buffer to space
sanja> + LSN_IN_PARTS(buffer_of_last_lsn->last_lsn),
sanja> + (ulong) buffer_of_last_lsn));
sanja> if (log_record_type_descriptor[type].inwrite_hook &&
sanja> (*log_record_type_descriptor[type].inwrite_hook) (type, trn,
sanja> tbl_info,
sanja> @@ -5868,6 +5931,9 @@
sanja> parts->total_record_length, parts);
sanja> log_descriptor.bc.buffer->last_lsn= *lsn;
sanja> + DBUG_PRINT("info", ("last_lsn set to (%lu,0x%lx), buffer: 0x%lx",
Change "," before buffer to space
sanja> + LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn),
sanja> + (ulong) log_descriptor.bc.buffer));
<cut>
sanja> +void translog_flush_wait_for_end(LSN lsn)
sanja> +{
sanja> + DBUG_ENTER("translog_flush_wait_for_end");
sanja> + DBUG_PRINT("enter", ("LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
Add an assert to check that one owns the log_descriptor.log_flush_lock
lock
sanja> + while (cmp_translog_addr(log_descriptor.flushed, lsn) < 0)
sanja> + pthread_cond_wait(&log_descriptor.log_flush_cond,
sanja> + &log_descriptor.log_flush_lock);
sanja> + DBUG_VOID_RETURN;
sanja> +}
sanja> +
sanja> +
sanja> +/**
sanja> + @brief Sets goal for the next flush pass and waits for this pass end.
sanja> +
sanja> + @param lsn log record serial number up to which (inclusive)
sanja> + the log has to be flushed
sanja> +*/
sanja> +
sanja> +void translog_flush_set_new_goal_and_wait(TRANSLOG_ADDRESS lsn)
sanja> +{
sanja> + DBUG_ENTER("translog_flush_set_new_goal_and_wait");
sanja> + DBUG_PRINT("enter", ("LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
Add an assert to check that one owns the log_descriptor.log_flush_lock
lock
sanja> + if (cmp_translog_addr(lsn, log_descriptor.next_pass_max_lsn) > 0)
sanja> + {
sanja> + log_descriptor.next_pass_max_lsn= lsn;
sanja> + log_descriptor.max_lsn_requester= pthread_self();
sanja> + }
sanja> + while (log_descriptor.flush_in_progress)
sanja> + {
sanja> + pthread_cond_wait(&log_descriptor.log_flush_cond,
sanja> + &log_descriptor.log_flush_lock);
sanja> + }
sanja> + DBUG_VOID_RETURN;
sanja> +}
<cut>
sanja> my_bool translog_flush(TRANSLOG_ADDRESS lsn)
sanja> {
sanja> - LSN old_flushed, sent_to_disk;
sanja> + LSN sent_to_disk= LSN_IMPOSSIBLE;
sanja> TRANSLOG_ADDRESS flush_horizon;
sanja> - int rc= 0;
sanja> - /* We can't have more different files then buffers */
sanja> - TRANSLOG_FILE *file_handlers[TRANSLOG_BUFFERS_NO];
sanja> - int current_file_handler= -1;
sanja> - uint32 prev_file= 0;
sanja> - my_bool full_circle= 0;
sanja> + uint fn, i;
sanja> + uint dirty_buffer_mask;
sanja> + uint8 last_buffer_no, start_buffer_no;
sanja> + my_bool rc= 0;
sanja> DBUG_ENTER("translog_flush");
sanja> DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
sanja> DBUG_ASSERT(translog_status == TRANSLOG_OK ||
sanja> translog_status == TRANSLOG_READONLY);
sanja> + compile_time_assert(sizeof(dirty_buffer_mask) ==
sanja> + sizeof(log_descriptor.dirty_buffer_mask));
You could also have an unique typedef for this type to avoid the above
sanja> pthread_mutex_lock(&log_descriptor.log_flush_lock);
sanja> + DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)",
Remove one space of above double space.
sanja> +
sanja> + /*
sanja> + We will recheck information when will lock buffers one by
sanja> + one so we can use unprotected read here (this is just for
sanja> + speed up buffers processing)
sanja> + */
sanja> + dirty_buffer_mask= log_descriptor.dirty_buffer_mask;
sanja> + DBUG_PRINT("info", ("Dirty buffer mask: %lx current buffer: %u",
sanja> + (ulong) dirty_buffer_mask,
sanja> + (uint) log_descriptor.bc.buffer_no));
sanja> + for (i= (log_descriptor.bc.buffer_no + 1) % TRANSLOG_BUFFERS_NO;
sanja> + i != log_descriptor.bc.buffer_no && !(dirty_buffer_mask & (1
> << i));
sanja> + i= (i + 1) % TRANSLOG_BUFFERS_NO) {}
sanja> + start_buffer_no= i;
For the above operations to be fast, you should ensure that
TRANSLOG_BUFFERS_NO is a power of 2, like 8 !
You can avoid the test: i != log_descriptor.bc.buffer_no by doing
before the for clause:
/* Ensure that we stop when we hit bc.buffer_no */
dirt_buffer_mask|= (1 << log_descriptor.bc.buffer_no)
sanja> + /* if we have to flush last buffer then we will finish it */
sanja> + if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->prev_last_lsn) >
> 0)
sanja> {
sanja> + lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon
> */
sanja> + last_buffer_no= log_descriptor.bc.buffer_no;
sanja> + log_descriptor.is_everything_flushed= 1;
sanja> + translog_force_current_buffer_to_finish();
sanja> + translog_buffer_unlock(buffer);
sanja> + }
sanja> + else
sanja> + {
sanja> + last_buffer_no= (log_descriptor.bc.buffer_no == 0 ?
sanja> + TRANSLOG_BUFFERS_NO -1 :
sanja> + log_descriptor.bc.buffer_no - 1);
Same as:
last_buffer_no= ((log_descriptor.bc.buffer_no + TRANSLOG_BUFFERS_NO-1) %
TRANSLOG_BUFFERS_NO);
This will even faster if TRANSLOG_BUFFERS_NO is a power of 2.
sanja> + translog_unlock();
sanja> + }
sanja> + sent_to_disk= translog_get_sent_to_disk();
sanja> + if (cmp_translog_addr(lsn, sent_to_disk) > 0)
sanja> + {
sanja> +
sanja> + DBUG_PRINT("info", ("Start buffer #: %u last buffer #: %u",
sanja> + (uint) start_buffer_no, (uint) last_buffer_no));
sanja> + last_buffer_no= (last_buffer_no + 1) % TRANSLOG_BUFFERS_NO;
sanja> + i= start_buffer_no;
sanja> + do
sanja> + {
sanja> + translog_buffer_lock(log_descriptor.buffers + i);
sanja> + DBUG_PRINT("info", ("Check buffer: 0x%lx #: %u "
sanja> + "prev last LSN: (%lu,0x%lx) "
sanja> + "last LSN: (%lu,0x%lx) status: %s",
sanja> + (ulong)(log_descriptor.buffers + i),
sanja> + (uint) i,
sanja> +
> LSN_IN_PARTS(log_descriptor.buffers[i].prev_last_lsn),
sanja> + LSN_IN_PARTS(log_descriptor.buffers[i].last_lsn),
sanja> + (log_descriptor.buffers[i].file ?
sanja> + "dirty" : "closed")));
sanja> + if (log_descriptor.buffers[i].prev_last_lsn <= lsn &&
sanja> + log_descriptor.buffers[i].file != NULL)
sanja> + {
sanja> + struct st_translog_buffer *buffer= log_descriptor.buffers + i;
sanja> + DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size);
sanja> + flush_horizon= buffer->offset + buffer->size;
sanja> + translog_buffer_flush(log_descriptor.buffers + i);
sanja> + }
sanja> + translog_buffer_unlock(log_descriptor.buffers + i);
sanja> + i= (i + 1) % TRANSLOG_BUFFERS_NO;
sanja> + } while (i != last_buffer_no);
Here you loop over start_buffer_no to last_buffer_no. What happens if
start_buffer_no was wrong because you read the dirty buffers without a
mutex and you read the value 0 when the real value should be 1?
In other words, how do you guarantee that all buffers that needs to be
flushed are flushed?
<cut>
sanja> out:
sanja> - pthread_mutex_unlock(&log_descriptor.log_flush_lock);
sanja> + pthread_mutex_lock(&log_descriptor.log_flush_lock);
sanja> + if (sent_to_disk != LSN_IMPOSSIBLE)
sanja> + log_descriptor.flushed= sent_to_disk;
sanja> + log_descriptor.flush_in_progress= 0;
sanja> + DBUG_PRINT("info", ("flush_in_progress is dropped"));
sanja> + pthread_cond_broadcast(&log_descriptor.log_flush_cond);
sanja> + pthread_mutex_unlock(&log_descriptor.log_flush_lock);\
Please move broadcast out of mutex (faster)
sanja> +++ b/storage/maria/unittest/ma_test_loghandler_multithread-t.c 2008-05-22
> 21:53:17 +0000
<cut>
sanja> +static void *test_thread_flusher(void *arg)
sanja> +{
sanja> + int param= *((int*) arg);
sanja> + int i;
sanja> +
sanja> + my_thread_init();
sanja> +
sanja> + for(i= 0; i < FLUSH_ITERATIONS; i++)
sanja> + {
sanja> + translog_flush(last_lsn);
sanja> + pthread_mutex_lock(&LOCK_thread_count);
sanja> + ok(1, "-- flush %d", param);
sanja> + pthread_mutex_unlock(&LOCK_thread_count);
sanja> + }
I don't see why you are testing by calling with same last_lsn many
times, but I assume it doesn't make any harm.
<cut>
Not much to comment upon; A couple of small style issues and a couple
of questions.
Regards,
Monty