Below is the list of changes that have just been committed into a local
5.1 repository of andrey. When andrey does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-09-28 12:49:07+02:00, andrey@stripped +7 -0
Merge ahristov@stripped:/home/bk/mysql-5.1-wl3337
into example.com:/work/mysql-5.1-runtime-wl3337-event_queue_wo_event_db_repository
MERGE: 1.2255.23.17
sql/event_queue.cc@stripped, 2006-09-28 12:49:02+02:00, andrey@stripped +3 -14
manual merge
MERGE: 1.14.1.1
sql/event_queue.h@stripped, 2006-09-28 12:40:59+02:00, andrey@stripped +0 -0
Auto merged
MERGE: 1.11.1.1
sql/event_scheduler.cc@stripped, 2006-09-28 12:49:02+02:00, andrey@stripped +0 -2
manual merfe
MERGE: 1.26.1.1
sql/event_scheduler.h@stripped, 2006-09-28 12:40:59+02:00, andrey@stripped +0 -0
Auto merged
MERGE: 1.17.1.1
sql/events.cc@stripped, 2006-09-28 12:40:59+02:00, andrey@stripped +0 -0
Auto merged
MERGE: 1.64.1.1
sql/events.h@stripped, 2006-09-28 12:40:59+02:00, andrey@stripped +0 -0
Auto merged
MERGE: 1.46.1.1
sql/sql_parse.cc@stripped, 2006-09-28 12:40:59+02:00, andrey@stripped +0 -0
Auto merged
MERGE: 1.572.1.1
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: andrey
# Host: example.com
# Root: /work/mysql-5.1-runtime-wl3337-event_queue_wo_event_db_repository/RESYNC
--- 1.574/sql/sql_parse.cc 2006-09-28 12:49:17 +02:00
+++ 1.575/sql/sql_parse.cc 2006-09-28 12:49:17 +02:00
@@ -3954,8 +3954,7 @@ end_with_restore_list:
if (!(res= Events::get_instance()->drop_event(thd,
lex->spname->m_db,
lex->spname->m_name,
- lex->drop_if_exists,
- FALSE)))
+ lex->drop_if_exists)))
send_ok(thd);
}
break;
--- 1.65/sql/events.cc 2006-09-28 12:49:18 +02:00
+++ 1.66/sql/events.cc 2006-09-28 12:49:18 +02:00
@@ -98,7 +98,7 @@ Event_queue events_event_queue;
static
Event_scheduler events_event_scheduler;
-static
+
Event_db_repository events_event_db_repository;
Events Events::singleton;
@@ -297,29 +297,6 @@ Events::Events()
/*
- Opens mysql.event table with specified lock
-
- SYNOPSIS
- Events::open_event_table()
- thd Thread context
- lock_type How to lock the table
- table We will store the open table here
-
- RETURN VALUE
- 1 Cannot lock table
- 2 The table is corrupted - different number of fields
- 0 OK
-*/
-
-int
-Events::open_event_table(THD *thd, enum thr_lock_type lock_type,
- TABLE **table)
-{
- return db_repository->open_event_table(thd, lock_type, table);
-}
-
-
-/*
The function exported to the world for creating of events.
SYNOPSIS
@@ -352,16 +329,24 @@ Events::create_event(THD *thd, Event_par
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->create_event(thd, parse_data, if_not_exists)))
{
- if ((ret= event_queue->create_event(thd, parse_data->dbname,
- parse_data->name)))
+ Event_queue_element *new_element;
+
+ if (!(new_element= new Event_queue_element()))
+ ret= TRUE; // OOM
+ else if ((ret= db_repository->load_named_event(thd, parse_data->dbname,
+ parse_data->name,
+ new_element)))
{
DBUG_ASSERT(ret == OP_LOAD_ERROR);
- my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0));
+ delete new_element;
}
+ else
+ event_queue->create_event(thd, new_element);
}
pthread_mutex_unlock(&LOCK_event_metadata);
DBUG_RETURN(ret);
+
}
@@ -388,6 +373,7 @@ bool
Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to)
{
int ret;
+ Event_queue_element *new_element;
DBUG_ENTER("Events::update_event");
LEX_STRING *new_dbname= rename_to ? &rename_to->m_db : NULL;
LEX_STRING *new_name= rename_to ? &rename_to->m_name : NULL;
@@ -401,12 +387,20 @@ Events::update_event(THD *thd, Event_par
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->update_event(thd, parse_data, new_dbname, new_name)))
{
- if ((ret= event_queue->update_event(thd, parse_data->dbname,
- parse_data->name, new_dbname, new_name)))
+ LEX_STRING dbname= new_dbname ? *new_dbname : parse_data->dbname;
+ LEX_STRING name= new_name ? *new_name : parse_data->name;
+
+ if (!(new_element= new Event_queue_element()))
+ ret= TRUE; // OOM
+ else if ((ret= db_repository->load_named_event(thd, dbname, name,
+ new_element)))
{
DBUG_ASSERT(ret == OP_LOAD_ERROR);
- my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0));
+ delete new_element;
}
+ else
+ event_queue->update_event(thd, parse_data->dbname, parse_data->name,
+ new_element);
}
pthread_mutex_unlock(&LOCK_event_metadata);
@@ -424,10 +418,6 @@ Events::update_event(THD *thd, Event_par
name [in] Event's name
if_exists [in] When set and the event does not exist =>
warning onto the stack
- only_from_disk [in] Whether to remove the event from the queue too.
- In case of Event_job_data::drop() it's needed to
- do only disk drop because Event_queue will handle
- removal from memory queue.
RETURN VALUE
FALSE OK
@@ -435,8 +425,7 @@ Events::update_event(THD *thd, Event_par
*/
bool
-Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists,
- bool only_from_disk)
+Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists)
{
int ret;
DBUG_ENTER("Events::drop_event");
@@ -449,10 +438,7 @@ Events::drop_event(THD *thd, LEX_STRING
pthread_mutex_lock(&LOCK_event_metadata);
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->drop_event(thd, dbname, name, if_exists)))
- {
- if (!only_from_disk)
- event_queue->drop_event(thd, dbname, name);
- }
+ event_queue->drop_event(thd, dbname, name);
pthread_mutex_unlock(&LOCK_event_metadata);
DBUG_RETURN(ret);
}
@@ -656,11 +642,12 @@ Events::init()
}
check_system_tables_error= FALSE;
- if (event_queue->init_queue(thd, db_repository))
+ if (event_queue->init_queue(thd) || load_events_from_db(thd))
{
sql_print_error("SCHEDULER: Error while loading from disk.");
goto end;
}
+
scheduler->init_scheduler(event_queue);
DBUG_ASSERT(opt_event_scheduler == Events::EVENTS_ON ||
@@ -668,6 +655,7 @@ Events::init()
if (opt_event_scheduler == Events::EVENTS_ON)
res= scheduler->start();
+ Event_worker_thread::init(this, db_repository);
end:
delete thd;
/* Remember that we don't have a THD */
@@ -902,5 +890,133 @@ Events::check_system_tables(THD *thd)
thd->restore_backup_open_tables_state(&backup);
+ DBUG_RETURN(ret);
+}
+
+
+/*
+ Loads all ENABLED events from mysql.event into the prioritized
+ queue. Called during scheduler main thread initialization. Compiles
+ the events. Creates Event_queue_element instances for every ENABLED event
+ from mysql.event.
+
+ SYNOPSIS
+ Event_queue::load_events_from_db()
+ thd Thread context. Used for memory allocation in some cases.
+
+ RETURN VALUE
+ 0 OK
+ !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
+ EVEX_COMPILE_ERROR) - in all these cases mysql.event was
+ tampered.
+
+ NOTES
+ Reports the error to the console
+*/
+
+int
+Events::load_events_from_db(THD *thd)
+{
+ TABLE *table;
+ READ_RECORD read_record_info;
+ int ret= -1;
+ uint count= 0;
+ bool clean_the_queue= TRUE;
+
+ DBUG_ENTER("Event_queue::load_events_from_db");
+ DBUG_PRINT("enter", ("thd=0x%lx", thd));
+
+ if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
+ {
+ sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
+ DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
+ }
+
+ init_read_record(&read_record_info, thd, table ,NULL,1,0);
+ while (!(read_record_info.read_record(&read_record_info)))
+ {
+ Event_queue_element *et;
+ if (!(et= new Event_queue_element))
+ {
+ DBUG_PRINT("info", ("Out of memory"));
+ break;
+ }
+ DBUG_PRINT("info", ("Loading event from row."));
+
+ if ((ret= et->load_from_row(table)))
+ {
+ sql_print_error("SCHEDULER: Error while loading from mysql.event. "
+ "Table probably corrupted");
+ break;
+ }
+ if (et->status != Event_queue_element::ENABLED)
+ {
+ DBUG_PRINT("info",("%s is disabled",et->name.str));
+ delete et;
+ continue;
+ }
+
+ /* let's find when to be executed */
+ if (et->compute_next_execution_time())
+ {
+ sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
+ " Skipping", et->dbname.str, et->name.str);
+ continue;
+ }
+
+ {
+ Event_job_data temp_job_data;
+ DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
+
+ temp_job_data.load_from_row(table);
+
+ /*
+ We load only on scheduler root just to check whether the body
+ compiles.
+ */
+ switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
+ case EVEX_MICROSECOND_UNSUP:
+ sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
+ "supported but found in mysql.event");
+ break;
+ case EVEX_COMPILE_ERROR:
+ sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
+ et->dbname.str, et->name.str);
+ break;
+ default:
+ break;
+ }
+ thd->end_statement();
+ thd->cleanup_after_query();
+ }
+ if (ret)
+ {
+ delete et;
+ goto end;
+ }
+
+ DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list."));
+ event_queue->create_event(thd, et);
+ count++;
+ }
+ clean_the_queue= FALSE;
+end:
+ end_read_record(&read_record_info);
+
+ if (clean_the_queue)
+ {
+ event_queue->empty_queue();
+ ret= -1;
+ }
+ else
+ {
+ ret= 0;
+ sql_print_information("SCHEDULER: Loaded %d event%s", count,
+ (count == 1)?"":"s");
+ }
+
+ close_thread_tables(thd);
+
+ DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
DBUG_RETURN(ret);
}
--- 1.49/sql/events.h 2006-09-28 12:49:18 +02:00
+++ 1.50/sql/events.h 2006-09-28 12:49:18 +02:00
@@ -43,13 +43,6 @@ sortcmp_lex_string(LEX_STRING s, LEX_STR
class Events
{
public:
- /*
- Quite NOT the best practice and will be removed once
- Event_timed::drop() and Event_timed is fixed not do drop directly
- or other scheme will be found.
- */
- friend class Event_queue_element;
-
/* The order should match the order in opt_typelib */
enum enum_opt_event_scheduler
{
@@ -93,15 +86,11 @@ public:
update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to);
bool
- drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists,
- bool only_from_disk);
+ drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists);
void
drop_schema_events(THD *thd, char *db);
- int
- open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table);
-
bool
show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
@@ -119,6 +108,9 @@ public:
private:
bool
check_system_tables(THD *thd);
+
+ int
+ load_events_from_db(THD *thd);
/* Singleton DP is used */
Events();
--- 1.18/sql/event_queue.cc 2006-09-28 12:49:18 +02:00
+++ 1.19/sql/event_queue.cc 2006-09-28 12:49:18 +02:00
@@ -17,7 +17,6 @@
#include "mysql_priv.h"
#include "event_queue.h"
#include "event_data_objects.h"
-#include "event_db_repository.h"
#define EVENT_QUEUE_INITIAL_SIZE 30
@@ -137,16 +136,14 @@ Event_queue::deinit_mutexes()
*/
bool
-Event_queue::init_queue(THD *thd, Event_db_repository *db_repo)
+Event_queue::init_queue(THD *thd)
{
- bool res;
struct event_queue_param *event_queue_param_value= NULL;
DBUG_ENTER("Event_queue::init_queue");
DBUG_PRINT("enter", ("this=0x%lx", this));
LOCK_QUEUE_DATA();
- db_repository= db_repo;
if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
0 /*max_on_top*/, event_queue_element_compare_q,
@@ -164,12 +161,8 @@ Event_queue::init_queue(THD *thd, Event_
goto err;
}
- res= load_events_from_db(thd);
UNLOCK_QUEUE_DATA();
- if (res)
- deinit_queue();
-
- DBUG_RETURN(res);
+ DBUG_RETURN(FALSE);
err:
UNLOCK_QUEUE_DATA();
@@ -206,37 +199,29 @@ Event_queue::deinit_queue()
Event_queue::create_event()
dbname The schema of the new event
name The name of the new event
-
- RETURN VALUE
- OP_OK OK or scheduler not working
- OP_LOAD_ERROR Error during loading from disk
*/
-int
-Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
+void
+Event_queue::create_event(THD *thd, Event_queue_element *new_element)
{
- int res;
- Event_queue_element *new_element;
DBUG_ENTER("Event_queue::create_event");
- DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, dbname.str, name.str));
+ DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd,
+ new_element->dbname.str, new_element->name.str));
- new_element= new Event_queue_element();
- res= db_repository->load_named_event(thd, dbname, name, new_element);
- if (res || new_element->status == Event_queue_element::DISABLED)
+ if (new_element->status == Event_queue_element::DISABLED)
delete new_element;
else
{
new_element->compute_next_execution_time();
+ DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
LOCK_QUEUE_DATA();
- DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
queue_insert_safe(&queue, (byte *) new_element);
dbug_dump_queue(thd->query_start());
pthread_cond_broadcast(&COND_queue_state);
UNLOCK_QUEUE_DATA();
}
-
- DBUG_RETURN(res);
+ DBUG_VOID_RETURN;
}
@@ -250,32 +235,16 @@ Event_queue::create_event(THD *thd, LEX_
name Name of the event
new_schema New schema, in case of RENAME TO, otherwise NULL
new_name New name, in case of RENAME TO, otherwise NULL
-
- RETURN VALUE
- OP_OK OK or scheduler not working
- OP_LOAD_ERROR Error during loading from disk
*/
-int
+void
Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
- LEX_STRING *new_schema, LEX_STRING *new_name)
+ Event_queue_element *new_element)
{
- int res;
- Event_queue_element *new_element;
-
DBUG_ENTER("Event_queue::update_event");
DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str));
- new_element= new Event_queue_element();
-
- res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname,
- new_name ? *new_name:name, new_element);
- if (res)
- {
- delete new_element;
- goto end;
- }
- else if (new_element->status == Event_queue_element::DISABLED)
+ if (new_element->status == Event_queue_element::DISABLED)
{
DBUG_PRINT("info", ("The event is disabled."));
/*
@@ -302,9 +271,7 @@ Event_queue::update_event(THD *thd, LEX_
dbug_dump_queue(thd->query_start());
UNLOCK_QUEUE_DATA();
-end:
- DBUG_PRINT("info", ("res=%d", res));
- DBUG_RETURN(res);
+ DBUG_VOID_RETURN;
}
@@ -455,134 +422,6 @@ Event_queue::find_n_remove_event(LEX_STR
/*
- Loads all ENABLED events from mysql.event into the prioritized
- queue. Called during scheduler main thread initialization. Compiles
- the events. Creates Event_queue_element instances for every ENABLED event
- from mysql.event.
-
- SYNOPSIS
- Event_queue::load_events_from_db()
- thd - Thread context. Used for memory allocation in some cases.
-
- RETURN VALUE
- 0 OK
- !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
- EVEX_COMPILE_ERROR) - in all these cases mysql.event was
- tampered.
-
- NOTES
- Reports the error to the console
-*/
-
-int
-Event_queue::load_events_from_db(THD *thd)
-{
- TABLE *table;
- READ_RECORD read_record_info;
- int ret= -1;
- uint count= 0;
- bool clean_the_queue= TRUE;
-
- DBUG_ENTER("Event_queue::load_events_from_db");
- DBUG_PRINT("enter", ("thd=0x%lx", thd));
-
- if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
- {
- sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
- DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
- }
-
- init_read_record(&read_record_info, thd, table ,NULL,1,0);
- while (!(read_record_info.read_record(&read_record_info)))
- {
- Event_queue_element *et;
- if (!(et= new Event_queue_element))
- {
- DBUG_PRINT("info", ("Out of memory"));
- break;
- }
- DBUG_PRINT("info", ("Loading event from row."));
-
- if ((ret= et->load_from_row(table)))
- {
- sql_print_error("SCHEDULER: Error while loading from mysql.event. "
- "Table probably corrupted");
- break;
- }
- if (et->status != Event_queue_element::ENABLED)
- {
- DBUG_PRINT("info",("%s is disabled",et->name.str));
- delete et;
- continue;
- }
-
- /* let's find when to be executed */
- if (et->compute_next_execution_time())
- {
- sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
- " Skipping", et->dbname.str, et->name.str);
- continue;
- }
-
- {
- Event_job_data temp_job_data;
- DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
-
- temp_job_data.load_from_row(table);
-
- /*
- We load only on scheduler root just to check whether the body
- compiles.
- */
- switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
- case EVEX_MICROSECOND_UNSUP:
- sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
- "supported but found in mysql.event");
- break;
- case EVEX_COMPILE_ERROR:
- sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
- et->dbname.str, et->name.str);
- break;
- default:
- break;
- }
- thd->end_statement();
- thd->cleanup_after_query();
- }
- if (ret)
- {
- delete et;
- goto end;
- }
-
- DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list."));
- queue_insert_safe(&queue, (byte *) et);
- count++;
- }
- clean_the_queue= FALSE;
-end:
- end_read_record(&read_record_info);
-
- if (clean_the_queue)
- {
- empty_queue();
- ret= -1;
- }
- else
- {
- ret= 0;
- sql_print_information("SCHEDULER: Loaded %d event%s", count,
- (count == 1)?"":"s");
- }
-
- close_thread_tables(thd);
-
- DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
- DBUG_RETURN(ret);
-}
-
-
-/*
Recalculates activation times in the queue. There is one reason for
that. Because the values (execute_at) by which the queue is ordered are
changed by calls to compute_next_execution_time() on a request from the
@@ -631,7 +470,7 @@ Event_queue::empty_queue()
{
uint i;
DBUG_ENTER("Event_queue::empty_queue");
- DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
+ DBUG_PRINT("enter", ("Purging the queue. %u element(s)", queue.elements));
sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements);
/* empty the queue */
for (i= 0; i < queue.elements; ++i)
@@ -688,38 +527,30 @@ static const char *queue_wait_msg= "Wait
SYNOPSIS
Event_queue::get_top_for_execution_if_time()
- thd [in] Thread
- now [in] Current timestamp
- job_data [out] The object to execute
- abstime [out] Time to sleep
+ thd [in] Thread
+ now [in] Current timestamp
+ event_name [out] The object to execute
RETURN VALUE
- FALSE No error. If *job_data==NULL then top not elligible for execution.
- Could be that there is no top. If abstime->tv_sec is set to value
- greater than zero then use abstime with pthread_cond_timedwait().
- If abstime->tv_sec is zero then sleep with pthread_cond_wait().
- abstime->tv_nsec is always zero.
- TRUE Error
-
+ FALSE No error. event_name != NULL
+ TRUE Serious error
*/
bool
-Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
+Event_queue::get_top_for_execution_if_time(THD *thd,
+ Event_queue_element_for_exec **event_name)
{
bool ret= FALSE;
struct timespec top_time;
struct timespec *abstime;
- Event_queue_element *top= NULL;
- bool to_free= FALSE;
- bool to_drop= FALSE;
- *job_data= NULL;
+ *event_name= NULL;
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
top_time.tv_nsec= 0;
LOCK_QUEUE_DATA();
for (;;)
{
- int res;
+ Event_queue_element *top= NULL;
thd->end_time();
time_t now= thd->query_start();
@@ -761,39 +592,30 @@ Event_queue::get_top_for_execution_if_ti
continue;
}
- DBUG_PRINT("info", ("Ready for execution"));
- if (!(*job_data= new Event_job_data()))
- {
- ret= TRUE;
- break;
- }
- if ((res= db_repository->load_named_event(thd, top->dbname, top->name,
- *job_data)))
+ if (!(*event_name= new Event_queue_element_for_exec()) ||
+ (*event_name)->init(top->dbname, top->name))
{
- DBUG_PRINT("error", ("Got %d from load_named_event", res));
- delete *job_data;
- *job_data= NULL;
ret= TRUE;
break;
}
+ DBUG_PRINT("info", ("Ready for execution"));
top->mark_last_executed(thd);
if (top->compute_next_execution_time())
top->status= Event_queue_element::DISABLED;
DBUG_PRINT("info", ("event %s status is %d", top->name.str, top->status));
- (*job_data)->execution_count= top->execution_count;
+ top->execution_count++;
+ (*event_name)->dropped= top->dropped;
top->update_timing_fields(thd);
- if (((top->execute_at.year && !top->expression) || top->execute_at_null) ||
- (top->status == Event_queue_element::DISABLED))
+ if (top->status == Event_queue_element::DISABLED)
{
DBUG_PRINT("info", ("removing from the queue"));
sql_print_information("SCHEDULER: Last execution of %s.%s. %s",
top->dbname.str, top->name.str,
top->dropped? "Dropping.":"");
- to_free= TRUE;
- to_drop= top->dropped;
+ delete top;
queue_remove(&queue, 0);
}
else
@@ -804,20 +626,13 @@ Event_queue::get_top_for_execution_if_ti
}
end:
UNLOCK_QUEUE_DATA();
- if (to_drop)
- {
- DBUG_PRINT("info", ("Dropping from disk"));
- top->drop(thd);
- }
- if (to_free)
- delete top;
-
+
DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ",
- ret, *job_data, abstime? abstime->tv_sec:0));
+ ret, *event_name, abstime? abstime->tv_sec:0));
- if (*job_data)
- DBUG_PRINT("info", ("db=%s name=%s definer=%s", (*job_data)->dbname.str,
- (*job_data)->name.str, (*job_data)->definer.str));
+ if (*event_name)
+ DBUG_PRINT("info", ("db=%s name=%s", (*event_name)->dbname.str,
+ (*event_name)->name.str));
DBUG_RETURN(ret);
}
--- 1.12/sql/event_queue.h 2006-09-28 12:49:18 +02:00
+++ 1.13/sql/event_queue.h 2006-09-28 12:49:18 +02:00
@@ -17,12 +17,10 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
class Event_basic;
-class Event_db_repository;
-class Event_job_data;
class Event_queue_element;
+class Event_queue_element_for_exec;
class THD;
-class Event_scheduler;
class Event_queue
{
@@ -36,19 +34,19 @@ public:
deinit_mutexes();
bool
- init_queue(THD *thd, Event_db_repository *db_repo);
+ init_queue(THD *thd);
void
deinit_queue();
/* Methods for queue management follow */
- int
- create_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
+ void
+ create_event(THD *thd, Event_queue_element *new_element);
- int
+ void
update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
- LEX_STRING *new_schema, LEX_STRING *new_name);
+ Event_queue_element *new_element);
void
drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
@@ -60,14 +58,15 @@ public:
recalculate_activation_times(THD *thd);
bool
- get_top_for_execution_if_time(THD *thd, Event_job_data **job_data);
+ get_top_for_execution_if_time(THD *thd,
+ Event_queue_element_for_exec **event_name);
+
void
dump_internal_status();
- int
- load_events_from_db(THD *thd);
-
+ void
+ empty_queue();
protected:
void
find_n_remove_event(LEX_STRING db, LEX_STRING name);
@@ -77,8 +76,6 @@ protected:
drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*)(LEX_STRING, Event_basic *));
- void
- empty_queue();
void
dbug_dump_queue(time_t now);
@@ -87,11 +84,7 @@ protected:
pthread_mutex_t LOCK_event_queue;
pthread_cond_t COND_queue_state;
- Event_db_repository *db_repository;
-
- Event_scheduler *scheduler;
-
- /* The sorted queue with the Event_job_data objects */
+ /* The sorted queue with the Event_queue_element objects */
QUEUE queue;
TIME next_activation_at;
--- 1.27/sql/event_scheduler.cc 2006-09-28 12:49:18 +02:00
+++ 1.28/sql/event_scheduler.cc 2006-09-28 12:49:18 +02:00
@@ -19,6 +19,7 @@
#include "event_data_objects.h"
#include "event_scheduler.h"
#include "event_queue.h"
+#include "event_db_repository.h"
#ifdef __GNUC__
#if __GNUC__ >= 2
@@ -35,6 +36,11 @@
extern pthread_attr_t connection_attrib;
+
+Event_db_repository *Event_worker_thread::db_repository;
+Events *Event_worker_thread::events_facade;
+
+
static
const LEX_STRING scheduler_states_names[] =
{
@@ -61,8 +67,8 @@ struct scheduler_param {
et The event itself
*/
-static void
-evex_print_warnings(THD *thd, Event_job_data *et)
+void
+Event_worker_thread::print_warnings(THD *thd, Event_job_data *et)
{
MYSQL_ERROR *err;
DBUG_ENTER("evex_print_warnings");
@@ -254,48 +260,99 @@ event_worker_thread(void *arg)
{
/* needs to be first for thread_stack */
THD *thd;
- Event_job_data *event= (Event_job_data *)arg;
- int ret;
+ Event_queue_element_for_exec *event= (Event_queue_element_for_exec *)arg;
thd= event->thd;
-
thd->thread_stack= (char *) &thd; // remember where our stack is
- DBUG_ENTER("event_worker_thread");
- if (!post_init_event_thread(thd))
+ Event_worker_thread worker_thread;
+ worker_thread.run(thd, (Event_queue_element_for_exec *)arg);
+
+ deinit_event_thread(thd);
+
+ return 0; // Can't return anything here
+}
+
+
+/*
+ Function that executes an event in a child thread. Setups the
+ environment for the event execution and cleans after that.
+
+ SYNOPSIS
+ Event_worker_thread::run()
+ thd Thread context
+ event The Event_queue_element_for_exec object to be processed
+*/
+
+void
+Event_worker_thread::run(THD *thd, Event_queue_element_for_exec *event)
+{
+ int ret;
+ Event_job_data *job_data= NULL;
+ DBUG_ENTER("Event_worker_thread::run");
+ DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
+ "THD=0x%lx", time(NULL), thd));
+
+ if (post_init_event_thread(thd))
+ goto end;
+
+ if (!(job_data= new Event_job_data()))
+ goto end;
+ else if ((ret= db_repository->
+ load_named_event(thd, event->dbname, event->name, job_data)))
{
- DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
- "THD=0x%lx", time(NULL), thd));
+ DBUG_PRINT("error", ("Got %d from load_named_event", ret));
+ goto end;
+ }
+
+ sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. ",
+ job_data->dbname.str, job_data->name.str,
+ job_data->definer.str, thd->thread_id);
+
+ thd->enable_slow_log= TRUE;
+
+ ret= job_data->execute(thd);
+
+ print_warnings(thd, job_data);
+
+ sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. "
+ "RetCode=%d", job_data->dbname.str, job_data->name.str,
+ job_data->definer.str, thd->thread_id, ret);
+ if (ret == EVEX_COMPILE_ERROR)
+ sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
+ job_data->dbname.str, job_data->name.str,
+ job_data->definer.str);
+ else if (ret == EVEX_MICROSECOND_UNSUP)
+ sql_print_information("SCHEDULER: MICROSECOND is not supported");
- sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. "
- "Execution %u",
- event->dbname.str, event->name.str,
- event->definer.str, thd->thread_id,
- event->execution_count);
-
- thd->enable_slow_log= TRUE;
-
- ret= event->execute(thd);
-
- evex_print_warnings(thd, event);
-
- sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. "
- "RetCode=%d", event->dbname.str, event->name.str,
- event->definer.str, thd->thread_id, ret);
- if (ret == EVEX_COMPILE_ERROR)
- sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
- event->dbname.str, event->name.str,
- event->definer.str);
- else if (ret == EVEX_MICROSECOND_UNSUP)
- sql_print_information("SCHEDULER: MICROSECOND is not supported");
+end:
+ delete job_data;
+
+ if (event->dropped)
+ {
+ sql_print_information("SCHEDULER: Dropping %s.%s", event->dbname.str,
+ event->name.str);
+ /*
+ Using db_repository can lead to a race condition because we access
+ the table without holding LOCK_metadata.
+ Scenario:
+ 1. CREATE EVENT xyz AT ... (conn thread)
+ 2. execute xyz (worker)
+ 3. CREATE EVENT XYZ EVERY ... (conn thread)
+ 4. drop xyz (worker)
+ 5. XYZ was just created on disk but `drop xyz` of the worker dropped it.
+ A consequent load to create Event_queue_element will fail.
+
+ If all operations are performed under LOCK_metadata there is no such
+ problem. However, this comes at the price of introduction bi-directional
+ association between class Events and class Event_worker_thread.
+ */
+ events_facade->drop_event(thd, event->dbname, event->name, FALSE);
}
DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str,
event->name.str));
- delete event;
- deinit_event_thread(thd);
-
- DBUG_RETURN(0); // Can't return anything here
+ delete event;
}
@@ -441,8 +498,6 @@ bool
Event_scheduler::run(THD *thd)
{
int res= FALSE;
- struct timespec abstime;
- Event_job_data *job_data;
DBUG_ENTER("Event_scheduler::run");
sql_print_information("SCHEDULER: Manager thread started with id %lu",
@@ -455,18 +510,20 @@ Event_scheduler::run(THD *thd)
while (is_running())
{
+ Event_queue_element_for_exec *event_name;
+
/* Gets a minimized version */
- if (queue->get_top_for_execution_if_time(thd, &job_data))
+ if (queue->get_top_for_execution_if_time(thd, &event_name))
{
sql_print_information("SCHEDULER: Serious error during getting next "
"event to execute. Stopping");
break;
}
- DBUG_PRINT("info", ("get_top returned job_data=0x%lx", job_data));
- if (job_data)
+ DBUG_PRINT("info", ("get_top returned job_data=0x%lx", event_name));
+ if (event_name)
{
- if ((res= execute_top(thd, job_data)))
+ if ((res= execute_top(thd, event_name)))
break;
}
else
@@ -500,7 +557,7 @@ Event_scheduler::run(THD *thd)
*/
bool
-Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
+Event_scheduler::execute_top(THD *thd, Event_queue_element_for_exec *event_name)
{
THD *new_thd;
pthread_t th;
@@ -511,13 +568,13 @@ Event_scheduler::execute_top(THD *thd, E
pre_init_event_thread(new_thd);
new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
- job_data->thd= new_thd;
+ event_name->thd= new_thd;
DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition",
- job_data->dbname.str, job_data->name.str));
+ event_name->dbname.str, event_name->name.str));
/* Major failure */
if ((res= pthread_create(&th, &connection_attrib, event_worker_thread,
- job_data)))
+ event_name)))
goto error;
++started_events;
@@ -538,7 +595,7 @@ error:
delete new_thd;
pthread_mutex_unlock(&LOCK_thread_count);
}
- delete job_data;
+ delete event_name;
DBUG_RETURN(TRUE);
}
--- 1.18/sql/event_scheduler.h 2006-09-28 12:49:18 +02:00
+++ 1.19/sql/event_scheduler.h 2006-09-28 12:49:18 +02:00
@@ -16,8 +16,11 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
class Event_queue;
class Event_job_data;
+class Event_db_repository;
+class Events;
void
pre_init_event_thread(THD* thd);
@@ -28,6 +31,29 @@ post_init_event_thread(THD* thd);
void
deinit_event_thread(THD *thd);
+
+class Event_worker_thread
+{
+public:
+ static void
+ init(Events *events, Event_db_repository *db_repo)
+ {
+ db_repository= db_repo;
+ events_facade= events;
+ }
+
+ void
+ run(THD *thd, Event_queue_element_for_exec *event);
+
+private:
+ void
+ print_warnings(THD *thd, Event_job_data *et);
+
+ static Event_db_repository *db_repository;
+ static Events *events_facade;
+};
+
+
class Event_scheduler
{
public:
@@ -72,10 +98,9 @@ private:
uint
workers_count();
-
/* helper functions */
bool
- execute_top(THD *thd, Event_job_data *job_data);
+ execute_top(THD *thd, Event_queue_element_for_exec *event_name);
/* helper functions for working with mutexes & conditionals */
void
| Thread |
|---|
| • bk commit into 5.1 tree (andrey:1.2316) | ahristov | 28 Sep |