Below is the list of changes that have just been committed into a local
5.2 repository of brian. When brian does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-10-22 19:12:31-07:00, brian@stripped +3 -0
Modified library to use circular queue for reading. The thread testing tool has been updated to have command line options.
storage/archive/azio.c@stripped, 2007-10-22 19:12:29-07:00, brian@stripped +40 -21
Circular queue update.
storage/archive/azio.h@stripped, 2007-10-22 19:12:29-07:00, brian@stripped +2 -1
Update for circular queue
storage/archive/concurrency_test.c@stripped, 2007-10-22 19:12:29-07:00, brian@stripped +184 -36
Made most options now command line modifieable.
diff -Nrup a/storage/archive/azio.c b/storage/archive/azio.c
--- a/storage/archive/azio.c 2007-10-22 16:49:42 -07:00
+++ b/storage/archive/azio.c 2007-10-22 19:12:29 -07:00
@@ -134,6 +134,18 @@ static az_thread_type azio_ready(azio_st
return temp;
}
+static void azio_fetch(azio_stream *s, azio_bucket_st *bucket)
+{
+ unsigned int wait= 1;
+
+ while (wait)
+ {
+ pthread_mutex_lock(&s->container.thresh_mutex);
+ wait= (bucket->state == AZ_STATE_COLD) ? 0 : 1 ;
+ pthread_mutex_unlock(&s->container.thresh_mutex);
+ }
+}
+
static int azio_start(azio_stream *s)
{
int rc= 0;
@@ -154,10 +166,11 @@ static int azio_start(azio_stream *s)
return rc;
}
-static int azio_read(azio_stream *s, azio_bucket_st *bucket)
+static int azio_read(azio_stream *s, azio_bucket_st *bucket, size_t offset)
{
pthread_mutex_lock(&s->container.thresh_mutex);
s->container.ready= AZ_THREAD_ACTIVE;
+ bucket->offset= offset;
bucket->state= AZ_STATE_HOT;
pthread_mutex_unlock(&s->container.thresh_mutex);
pthread_cond_broadcast(&s->container.threshhold);
@@ -190,10 +203,6 @@ int azopen(azio_stream *s, const char *p
s->container.ready= AZ_THREAD_FINISHED;
- s->inbuf= &s->bucket[0];
- s->outbuf= &s->bucket2[0];
- s->stream.next_in = s->inbuf->buffer;
- s->stream.next_out = s->outbuf->buffer;
s->z_err = Z_OK;
s->back = EOF;
s->crc = crc32(0L, Z_NULL, 0);
@@ -212,6 +221,20 @@ int azopen(azio_stream *s, const char *p
if (Flags & O_RDWR)
s->mode = 'w';
+ /* Based on mode, we decide who gets the reserved bucket */
+ if (s->mode == 'r')
+ {
+ s->inbuf= &s->bucket[0];
+ s->outbuf= &s->reserved;
+ }
+ else
+ {
+ s->inbuf= &s->reserved;
+ s->outbuf= &s->bucket[0];
+ }
+ s->stream.next_in = s->inbuf->buffer;
+ s->stream.next_out = s->outbuf->buffer;
+
if (s->mode == 'w')
{
err = deflateInit2(&(s->stream), level,
@@ -297,6 +320,10 @@ int azopen(azio_stream *s, const char *p
s->bucket[x].state= AZ_STATE_COLD;
s->bucket[x].fd= s->file;
s->bucket[x].thresh_mutex= &s->container.thresh_mutex;
+ if (x == AZ_BUCKET_NUM -1)
+ s->bucket[x].next= &s->bucket[0];
+ else
+ s->bucket[x].next= &s->bucket[x + 1];
}
return 1;
@@ -784,13 +811,9 @@ int azread_init(azio_stream *s)
/* Put one in the chamber */
if (s->method != AZ_METHOD_BLOCK)
{
- azio_bucket_st *bucket;
- do_aio_cleanup(s);
-
- bucket= &s->bucket[0];
- s->inbuf= &s->bucket[1];
- bucket->offset= s->pos;
- azio_read(s, bucket);
+ azio_fetch(s, &s->bucket[0]);
+ s->inbuf= &s->bucket[0];
+ azio_read(s, s->inbuf->next, s->pos);
s->aio_inited= 1;
}
@@ -1067,11 +1090,10 @@ static void get_block(azio_stream *s)
&& s->pos < s->check_point
&& s->aio_inited)
{
- azio_bucket_st *bucket;
-
- azio_ready(s);
- bucket= (s->inbuf == &s->bucket[0]) ? &s->bucket[1] : &s->bucket[0];
- s->stream.avail_in= (unsigned int)bucket->read_size;
+ azio_fetch(s, s->inbuf->next);
+ /* Shift to the next bucket in the queue */
+ s->inbuf= s->inbuf->next;
+ s->stream.avail_in= (unsigned int)s->inbuf->read_size;
if ((int)(s->stream.avail_in) == -1)
goto use_pread;
else if ((int)(s->stream.avail_in) == 0)
@@ -1080,16 +1102,13 @@ static void get_block(azio_stream *s)
return;
}
s->pos+= s->stream.avail_in;
- s->inbuf= bucket;
/* We only aio_read when we know there is more data to be read */
if (s->pos >= s->check_point)
{
s->aio_inited= 0;
return;
}
- bucket= (s->inbuf == &s->bucket[0]) ? &s->bucket[1] : &s->bucket[0];
- bucket->offset= s->pos;
- azio_read(s, bucket);
+ azio_read(s, s->inbuf->next, s->pos);
}
else
{
diff -Nrup a/storage/archive/azio.h b/storage/archive/azio.h
--- a/storage/archive/azio.h 2007-10-22 16:49:42 -07:00
+++ b/storage/archive/azio.h 2007-10-22 19:12:29 -07:00
@@ -244,6 +244,7 @@ struct azio_bucket_st {
size_t read_size;
az_state state;
pthread_mutex_t *thresh_mutex;
+ azio_bucket_st *next;
};
struct azio_container_st {
@@ -262,7 +263,7 @@ typedef struct azio_stream {
azio_bucket_st *inbuf; /* input buffer */
azio_bucket_st *outbuf; /* output buffer */
azio_bucket_st bucket[AZ_BUCKET_NUM];
- azio_bucket_st bucket2[1];
+ azio_bucket_st reserved;
int aio_inited; /* Are we good to go */
uLong crc; /* crc32 of uncompressed data */
char *msg; /* error message */
diff -Nrup a/storage/archive/concurrency_test.c b/storage/archive/concurrency_test.c
--- a/storage/archive/concurrency_test.c 2007-10-21 14:19:49 -07:00
+++ b/storage/archive/concurrency_test.c 2007-10-22 19:12:29 -07:00
@@ -19,6 +19,8 @@
#ifndef __WIN__
#include <sys/wait.h>
#endif
+#include <help_start.h>
+
#ifdef __WIN__
#define srandom srand
@@ -29,12 +31,29 @@
#include "azio.h"
#define DEFAULT_INITIAL_LOAD 10000
+#define DEFAULT_READ_RATIO 1
+#define DEFAULT_WRITE_RATIO 1000
#define DEFAULT_EXECUTE_SECONDS 120
#define DEFAULT_CONCURRENCY 8
-#define TEST_FILENAME "concurrency_test.az"
+#define TEST_FILENAME "/dev/shm/concurrency_test.az"
#define HUGE_STRING_LENGTH 8192
+enum options_client
+{
+ OPT_CONCURRENCY,
+ OPT_READ,
+ OPT_WRITE,
+ OPT_TIMER,
+ OPT_INITIAL
+};
+
+static unsigned long long opt_initial_rows;
+static unsigned int opt_to_write;
+static unsigned int opt_to_read;
+static unsigned int opt_seconds;
+static unsigned int opt_concurrency;
+
/* Global Thread counter */
unsigned int thread_counter;
pthread_mutex_t counter_mutex;
@@ -48,18 +67,26 @@ pthread_cond_t timer_alarm_threshold;
pthread_mutex_t row_lock;
+static char **defaults_argv;
+static const char *load_default_groups[]= { 0 };
+const char *default_dbug_option="d:t:o,/tmp/concurrency_test.trace";
+
/* Prototypes */
void *run_task(void *p);
void *timer_thread(void *p);
-void scheduler(az_method use_aio);
-void create_data_file(azio_stream *write_handler, unsigned long long rows);
+void scheduler(unsigned int use_aio);
+void create_data_file(azio_stream *write_handler, unsigned long long rows, unsigned int use_aio);
unsigned int write_row(azio_stream *s);
+static int get_options(int *argc,char ***argv);
+
typedef struct thread_context_st thread_context_st;
struct thread_context_st {
unsigned int how_often_to_write;
- unsigned long long counter;
- az_method use_aio;
+ unsigned int how_often_to_read;
+ unsigned long long read_counter;
+ unsigned long long write_counter;
+ unsigned int use_aio;
azio_stream *writer;
};
@@ -81,13 +108,16 @@ static void get_random_string(char *buff
int main(int argc, char *argv[])
{
- az_method method;
my_init();
MY_INIT(argv[0]);
- if (argc > 1)
+ if (get_options(&argc, &argv))
+ {
+ free_defaults(defaults_argv);
+ my_end(0);
exit(1);
+ }
if (!(mysql_thread_safe()))
fprintf(stderr, "This application was compiled incorrectly. Please recompile with thread support.\n");
@@ -102,8 +132,10 @@ int main(int argc, char *argv[])
VOID(pthread_cond_init(&timer_alarm_threshold, NULL));
VOID(pthread_mutex_init(&row_lock, NULL));
- for (method= AZ_METHOD_BLOCK; method < AZ_METHOD_MAX; method++)
- scheduler(method);
+ printf("Running with blocking\n");
+ scheduler(0);
+ printf("Running with non-blocking\n");
+ scheduler(1);
(void)pthread_mutex_destroy(&counter_mutex);
(void)pthread_cond_destroy(&count_threshhold);
@@ -116,10 +148,11 @@ int main(int argc, char *argv[])
return 0;
}
-void scheduler(az_method use_aio)
+void scheduler(unsigned int use_aio)
{
unsigned int x;
unsigned long long total;
+ unsigned long long writer_total;
azio_stream writer_handle;
thread_context_st *context;
pthread_t mainthread; /* Thread descriptor */
@@ -132,14 +165,14 @@ void scheduler(az_method use_aio)
pthread_mutex_lock(&counter_mutex);
thread_counter= 0;
- create_data_file(&writer_handle, DEFAULT_INITIAL_LOAD);
+ create_data_file(&writer_handle, DEFAULT_INITIAL_LOAD, use_aio);
pthread_mutex_lock(&sleeper_mutex);
master_wakeup= 1;
pthread_mutex_unlock(&sleeper_mutex);
- context= (thread_context_st *)malloc(sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
- bzero(context, sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
+ context= (thread_context_st *)malloc(sizeof(thread_context_st) * opt_concurrency);
+ bzero(context, sizeof(thread_context_st) * opt_concurrency);
if (!context)
{
@@ -147,17 +180,21 @@ void scheduler(az_method use_aio)
exit(1);
}
- for (x= 0; x < DEFAULT_CONCURRENCY; x++)
+ printf("Creating Threads\n");
+
+ for (x= 0; x < opt_concurrency; x++)
{
- context[x].how_often_to_write= random()%1000;
+ context[x].how_often_to_write= opt_to_write;
+ context[x].how_often_to_read= opt_to_read;
context[x].writer= &writer_handle;
- context[x].counter= 0;
+ context[x].read_counter= 0;
+ context[x].write_counter= 0;
context[x].use_aio= use_aio;
/* now you create the thread */
if (pthread_create(&mainthread, &attr, run_task,
- (void *)context) != 0)
+ (void *)&context[x]) != 0)
{
fprintf(stderr,"Could not create thread\n");
exit(1);
@@ -165,9 +202,9 @@ void scheduler(az_method use_aio)
thread_counter++;
}
- if (DEFAULT_EXECUTE_SECONDS)
+ if (opt_seconds)
{
- time_t opt_timer_length= DEFAULT_EXECUTE_SECONDS;
+ time_t opt_timer_length= (time_t)opt_seconds;
pthread_mutex_lock(&timer_alarm_mutex);
timer_alarm= TRUE;
pthread_mutex_unlock(&timer_alarm_mutex);
@@ -188,6 +225,8 @@ void scheduler(az_method use_aio)
pthread_mutex_unlock(&sleeper_mutex);
pthread_cond_broadcast(&sleep_threshhold);
+ printf("Running Threads\n");
+
/*
We loop until we know that all children have cleaned up.
*/
@@ -203,13 +242,17 @@ void scheduler(az_method use_aio)
}
pthread_mutex_unlock(&counter_mutex);
- for (total= x= 0; x < DEFAULT_CONCURRENCY; x++)
- total+= context[x].counter;
+ for (total= x= 0; x < opt_concurrency; x++)
+ total+= context[x].read_counter;
+
+ for (writer_total= x= 0; x < opt_concurrency; x++)
+ writer_total+= context[x].write_counter;
free(context);
azclose(&writer_handle);
- printf("Read %llu rows\n", total);
+ printf("\tRead %llu rows\n", total);
+ printf("\tWrote %llu rows\n", writer_total);
}
void *timer_thread(void *p)
@@ -241,6 +284,8 @@ void *timer_thread(void *p)
pthread_cond_timedwait(&timer_alarm_threshold, &timer_alarm_mutex, &abstime);
pthread_mutex_unlock(&timer_alarm_mutex);
+ printf("Now alarming Threads\n");
+
pthread_mutex_lock(&timer_alarm_mutex);
timer_alarm= FALSE;
pthread_mutex_unlock(&timer_alarm_mutex);
@@ -256,6 +301,7 @@ void *run_task(void *p)
unsigned long long count;
int ret;
int error;
+ my_bool timer_temp= FALSE;
azio_stream reader_handle;
if (mysql_thread_init())
@@ -264,11 +310,14 @@ void *run_task(void *p)
exit(1);
}
- if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY|O_BINARY,
- context->use_aio)))
+ if (context->how_often_to_read)
{
- printf("Could not open test file\n");
- return 0;
+ if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY|O_BINARY,
+ context->use_aio ? AZ_METHOD_AIO: AZ_METHOD_BLOCK)))
+ {
+ printf("Could not open test file\n");
+ return 0;
+ }
}
pthread_mutex_lock(&sleeper_mutex);
@@ -280,40 +329,62 @@ void *run_task(void *p)
/* Do Stuff */
count= 0;
+ timer_temp= timer_alarm;
while (1)
{
- azread_init(&reader_handle);
- while ((ret= azread_row(&reader_handle, &error)))
- context->counter++;
+ if (context->how_often_to_read && count % context->how_often_to_read)
+ {
+ azread_init(&reader_handle);
+ while ((ret= azread_row(&reader_handle, &error)))
+ {
+ context->read_counter++;
+ if (timer_temp == FALSE)
+ {
+ goto end;
+ }
+ }
+ }
- if (count % context->how_often_to_write)
+ if (context->how_often_to_write && count % context->how_often_to_write)
{
+ context->write_counter++;
write_row(context->writer);
}
-
/* If the timer is set, and the alarm is not active then end */
- if (timer_alarm == FALSE)
- break;
+#ifdef CRAP
+ pthread_mutex_lock(&timer_alarm_mutex);
+ timer_temp= timer_alarm;
+ pthread_mutex_unlock(&timer_alarm_mutex);
+#endif
+ timer_temp= timer_alarm;
+
+ if (timer_temp == FALSE)
+ {
+ goto end;
+ }
+ count++;
}
+end:
pthread_mutex_lock(&counter_mutex);
thread_counter--;
pthread_cond_signal(&count_threshhold);
pthread_mutex_unlock(&counter_mutex);
- azclose(&reader_handle);
+ if (context->how_often_to_read)
+ azclose(&reader_handle);
mysql_thread_end();
return NULL;
}
-void create_data_file(azio_stream *write_handler, unsigned long long rows)
+void create_data_file(azio_stream *write_handler, unsigned long long rows, unsigned int use_aio)
{
int ret;
unsigned long long x;
if (!(ret= azopen(write_handler, TEST_FILENAME, O_CREAT|O_RDWR|O_TRUNC|O_BINARY,
- AZ_METHOD_BLOCK)))
+ use_aio ? AZ_METHOD_AIO: AZ_METHOD_BLOCK)))
{
printf("Could not create test file\n");
exit(1);
@@ -339,6 +410,83 @@ unsigned int write_row(azio_stream *s)
pthread_mutex_lock(&row_lock);
azwrite_row(s, buffer, length);
pthread_mutex_unlock(&row_lock);
+
+ return 0;
+}
+
+static struct my_option my_long_options[] =
+{
+ {"concurrency", OPT_CONCURRENCY,
+ "Concurrency to use.",
+ (uchar**) &opt_concurrency, (uchar**) &opt_concurrency,
+ 0, GET_UINT, REQUIRED_ARG, DEFAULT_CONCURRENCY, 0, 0, 0, 0, 0},
+ {"help", '?', "Display this help and exit.", 0, 0, 0, GET_NO_ARG, NO_ARG,
+ 0, 0, 0, 0, 0, 0},
+ {"initial", OPT_INITIAL,
+ "Number of initial rows wich should be loaded.",
+ (uchar**) &opt_initial_rows, (uchar**) &opt_initial_rows,
+ 0, GET_ULL, REQUIRED_ARG, DEFAULT_INITIAL_LOAD, 0, 0, 0, 0, 0},
+ {"read", OPT_READ,
+ "Ratio of readers.",
+ (uchar**) &opt_to_read, (uchar**) &opt_to_read,
+ 0, GET_UINT, REQUIRED_ARG, DEFAULT_READ_RATIO, 0, 0, 0, 0, 0},
+ {"timer", OPT_TIMER,
+ "Number of seconds test should run.",
+ (uchar**) &opt_seconds, (uchar**) &opt_seconds,
+ 0, GET_UINT, REQUIRED_ARG, DEFAULT_EXECUTE_SECONDS, 0, 0, 0, 0, 0},
+ {"version", 'V', "Output version information and exit.", 0, 0, 0, GET_NO_ARG,
+ NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"write", OPT_WRITE,
+ "Ratio of writers.",
+ (uchar**) &opt_to_write, (uchar**) &opt_to_write,
+ 0, GET_UINT, REQUIRED_ARG, DEFAULT_WRITE_RATIO, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
+};
+
+static void print_version(void)
+{
+ printf("%s Ver %s Distrib %s, for %s (%s)\n",my_progname, "1.0",
+ MYSQL_SERVER_VERSION,SYSTEM_TYPE,MACHINE_TYPE);
+}
+
+
+static void usage(void)
+{
+ print_version();
+ puts("Copyright (C) 2005 MySQL AB");
+ puts("This software comes with ABSOLUTELY NO WARRANTY. This is free software,\
+ \nand you are welcome to modify and redistribute it under the GPL \
+ license\n");
+ puts("Run a query multiple times against the server\n");
+ printf("Usage: %s [OPTIONS]\n",my_progname);
+ print_defaults("my",load_default_groups);
+ my_print_help(my_long_options);
+}
+
+static my_bool
+get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
+ char *argument __attribute__((unused)))
+{
+ switch(optid) {
+ case 'V':
+ print_version();
+ exit(0);
+ break;
+ case '?':
+ case 'I': /* Info */
+ usage();
+ exit(0);
+ }
+ return 0;
+}
+
+static int
+get_options(int *argc,char ***argv)
+{
+ int ho_error;
+
+ if ((ho_error= handle_options(argc, argv, my_long_options, get_one_option)))
+ exit(ho_error);
return 0;
}
| Thread |
|---|
| • bk commit into 5.2 tree (brian:1.2647) | Brian Aker | 23 Oct |