Below is the list of changes that have just been committed into a local
5.1 repository of andrey. When andrey does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-09-05 17:08:00+02:00, andrey@stripped +5 -0
.
sql/event_queue.cc@stripped, 2006-09-05 17:07:54+02:00, andrey@stripped +12 -171
.
sql/event_queue.h@stripped, 2006-09-05 17:07:54+02:00, andrey@stripped +7 -9
.
sql/event_scheduler.cc@stripped, 2006-09-05 17:07:54+02:00, andrey@stripped +4 -3
.
sql/events.cc@stripped, 2006-09-05 17:07:54+02:00, andrey@stripped +153 -7
.
sql/events.h@stripped, 2006-09-05 17:07:54+02:00, andrey@stripped +3 -0
.
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: andrey
# Host: example.com
# Root: /work/mysql-5.1-runtime-wl3337
--- 1.64/sql/events.cc 2006-09-05 17:08:10 +02:00
+++ 1.65/sql/events.cc 2006-09-05 17:08:10 +02:00
@@ -352,16 +352,24 @@ Events::create_event(THD *thd, Event_par
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->create_event(thd, parse_data, if_not_exists)))
{
- if ((ret= event_queue->create_event(thd, parse_data->dbname,
- parse_data->name)))
+ Event_queue_element *new_element;
+
+ if (!(new_element= new Event_queue_element()))
+ ret= TRUE; // OOM
+ else if ((ret= db_repository->load_named_event(thd, parse_data->dbname,
+ parse_data->name,
+ new_element)))
{
DBUG_ASSERT(ret == OP_LOAD_ERROR);
- my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0));
+ delete new_element;
}
+ else
+ event_queue->create_event(thd, new_element);
}
pthread_mutex_unlock(&LOCK_event_metadata);
DBUG_RETURN(ret);
+
}
@@ -388,6 +396,7 @@ bool
Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to)
{
int ret;
+ Event_queue_element *new_element;
DBUG_ENTER("Events::update_event");
LEX_STRING *new_dbname= rename_to ? &rename_to->m_db : NULL;
LEX_STRING *new_name= rename_to ? &rename_to->m_name : NULL;
@@ -401,12 +410,20 @@ Events::update_event(THD *thd, Event_par
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->update_event(thd, parse_data, new_dbname, new_name)))
{
- if ((ret= event_queue->update_event(thd, parse_data->dbname,
- parse_data->name, new_dbname, new_name)))
+ LEX_STRING dbname= new_dbname ? *new_dbname : parse_data->dbname;
+ LEX_STRING name= new_name ? *new_name : parse_data->name;
+
+ if (!(new_element= new Event_queue_element()))
+ ret= TRUE; // OOM
+ else if ((ret= db_repository->load_named_event(thd, dbname, name,
+ new_element)))
{
DBUG_ASSERT(ret == OP_LOAD_ERROR);
- my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0));
+ delete new_element;
}
+ else
+ event_queue->update_event(thd, parse_data->dbname, parse_data->name,
+ new_element);
}
pthread_mutex_unlock(&LOCK_event_metadata);
@@ -656,11 +673,12 @@ Events::init()
}
check_system_tables_error= FALSE;
- if (event_queue->init_queue(thd, db_repository))
+ if (event_queue->init_queue(thd, db_repository) || load_events_from_db(thd))
{
sql_print_error("SCHEDULER: Error while loading from disk.");
goto end;
}
+
scheduler->init_scheduler(event_queue);
DBUG_ASSERT(opt_event_scheduler == Events::EVENTS_ON ||
@@ -914,5 +932,133 @@ Events::check_system_tables(THD *thd)
thd->restore_backup_open_tables_state(&backup);
+ DBUG_RETURN(ret);
+}
+
+
+/*
+ Loads all ENABLED events from mysql.event into the prioritized
+ queue. Called during scheduler main thread initialization. Compiles
+ the events. Creates Event_queue_element instances for every ENABLED event
+ from mysql.event.
+
+ SYNOPSIS
+ Event_queue::load_events_from_db()
+ thd Thread context. Used for memory allocation in some cases.
+
+ RETURN VALUE
+ 0 OK
+ !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
+ EVEX_COMPILE_ERROR) - in all these cases mysql.event was
+ tampered.
+
+ NOTES
+ Reports the error to the console
+*/
+
+int
+Events::load_events_from_db(THD *thd)
+{
+ TABLE *table;
+ READ_RECORD read_record_info;
+ int ret= -1;
+ uint count= 0;
+ bool clean_the_queue= TRUE;
+
+ DBUG_ENTER("Event_queue::load_events_from_db");
+ DBUG_PRINT("enter", ("thd=0x%lx", thd));
+
+ if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
+ {
+ sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
+ DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
+ }
+
+ init_read_record(&read_record_info, thd, table ,NULL,1,0);
+ while (!(read_record_info.read_record(&read_record_info)))
+ {
+ Event_queue_element *et;
+ if (!(et= new Event_queue_element))
+ {
+ DBUG_PRINT("info", ("Out of memory"));
+ break;
+ }
+ DBUG_PRINT("info", ("Loading event from row."));
+
+ if ((ret= et->load_from_row(table)))
+ {
+ sql_print_error("SCHEDULER: Error while loading from mysql.event. "
+ "Table probably corrupted");
+ break;
+ }
+ if (et->status != Event_queue_element::ENABLED)
+ {
+ DBUG_PRINT("info",("%s is disabled",et->name.str));
+ delete et;
+ continue;
+ }
+
+ /* let's find when to be executed */
+ if (et->compute_next_execution_time())
+ {
+ sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
+ " Skipping", et->dbname.str, et->name.str);
+ continue;
+ }
+
+ {
+ Event_job_data temp_job_data;
+ DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
+
+ temp_job_data.load_from_row(table);
+
+ /*
+ We load only on scheduler root just to check whether the body
+ compiles.
+ */
+ switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
+ case EVEX_MICROSECOND_UNSUP:
+ sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
+ "supported but found in mysql.event");
+ break;
+ case EVEX_COMPILE_ERROR:
+ sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
+ et->dbname.str, et->name.str);
+ break;
+ default:
+ break;
+ }
+ thd->end_statement();
+ thd->cleanup_after_query();
+ }
+ if (ret)
+ {
+ delete et;
+ goto end;
+ }
+
+ DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list."));
+ event_queue->create_event(thd, et);
+ count++;
+ }
+ clean_the_queue= FALSE;
+end:
+ end_read_record(&read_record_info);
+
+ if (clean_the_queue)
+ {
+ event_queue->empty_queue();
+ ret= -1;
+ }
+ else
+ {
+ ret= 0;
+ sql_print_information("SCHEDULER: Loaded %d event%s", count,
+ (count == 1)?"":"s");
+ }
+
+ close_thread_tables(thd);
+
+ DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
DBUG_RETURN(ret);
}
--- 1.46/sql/events.h 2006-09-05 17:08:10 +02:00
+++ 1.47/sql/events.h 2006-09-05 17:08:10 +02:00
@@ -119,6 +119,9 @@ private:
bool
check_system_tables(THD *thd);
+ int
+ load_events_from_db(THD *thd);
+
/* Singleton DP is used */
Events();
~Events(){}
--- 1.14/sql/event_queue.cc 2006-09-05 17:08:10 +02:00
+++ 1.15/sql/event_queue.cc 2006-09-05 17:08:10 +02:00
@@ -137,7 +137,6 @@ Event_queue::deinit_mutexes()
bool
Event_queue::init_queue(THD *thd, Event_db_repository *db_repo)
{
- bool res;
struct event_queue_param *event_queue_param_value= NULL;
DBUG_ENTER("Event_queue::init_queue");
@@ -162,12 +161,8 @@ Event_queue::init_queue(THD *thd, Event_
goto err;
}
- res= load_events_from_db(thd);
UNLOCK_QUEUE_DATA();
- if (res)
- deinit_queue();
-
- DBUG_RETURN(res);
+ DBUG_RETURN(FALSE);
err:
UNLOCK_QUEUE_DATA();
@@ -204,37 +199,29 @@ Event_queue::deinit_queue()
Event_queue::create_event()
dbname The schema of the new event
name The name of the new event
-
- RETURN VALUE
- OP_OK OK or scheduler not working
- OP_LOAD_ERROR Error during loading from disk
*/
-int
-Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
+void
+Event_queue::create_event(THD *thd, Event_queue_element *new_element)
{
- int res;
- Event_queue_element *new_element;
DBUG_ENTER("Event_queue::create_event");
- DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, dbname.str, name.str));
+ DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd,
+ new_element->dbname.str, new_element->name.str));
- new_element= new Event_queue_element();
- res= db_repository->load_named_event(thd, dbname, name, new_element);
- if (res || new_element->status == Event_queue_element::DISABLED)
+ if (new_element->status == Event_queue_element::DISABLED)
delete new_element;
else
{
new_element->compute_next_execution_time();
+ DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
LOCK_QUEUE_DATA();
- DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
queue_insert_safe(&queue, (byte *) new_element);
dbug_dump_queue(thd->query_start());
pthread_cond_broadcast(&COND_queue_state);
UNLOCK_QUEUE_DATA();
}
-
- DBUG_RETURN(res);
+ DBUG_VOID_RETURN;
}
@@ -248,32 +235,16 @@ Event_queue::create_event(THD *thd, LEX_
name Name of the event
new_schema New schema, in case of RENAME TO, otherwise NULL
new_name New name, in case of RENAME TO, otherwise NULL
-
- RETURN VALUE
- OP_OK OK or scheduler not working
- OP_LOAD_ERROR Error during loading from disk
*/
-int
+void
Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
- LEX_STRING *new_schema, LEX_STRING *new_name)
+ Event_queue_element *new_element)
{
- int res;
- Event_queue_element *new_element;
-
DBUG_ENTER("Event_queue::update_event");
DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str));
- new_element= new Event_queue_element();
-
- res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname,
- new_name ? *new_name:name, new_element);
- if (res)
- {
- delete new_element;
- goto end;
- }
- else if (new_element->status == Event_queue_element::DISABLED)
+ if (new_element->status == Event_queue_element::DISABLED)
{
DBUG_PRINT("info", ("The event is disabled."));
/*
@@ -300,9 +271,7 @@ Event_queue::update_event(THD *thd, LEX_
dbug_dump_queue(thd->query_start());
UNLOCK_QUEUE_DATA();
-end:
- DBUG_PRINT("info", ("res=%d", res));
- DBUG_RETURN(res);
+ DBUG_VOID_RETURN;
}
@@ -449,134 +418,6 @@ Event_queue::find_n_remove_event(LEX_STR
}
DBUG_VOID_RETURN;
-}
-
-
-/*
- Loads all ENABLED events from mysql.event into the prioritized
- queue. Called during scheduler main thread initialization. Compiles
- the events. Creates Event_queue_element instances for every ENABLED event
- from mysql.event.
-
- SYNOPSIS
- Event_queue::load_events_from_db()
- thd - Thread context. Used for memory allocation in some cases.
-
- RETURN VALUE
- 0 OK
- !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
- EVEX_COMPILE_ERROR) - in all these cases mysql.event was
- tampered.
-
- NOTES
- Reports the error to the console
-*/
-
-int
-Event_queue::load_events_from_db(THD *thd)
-{
- TABLE *table;
- READ_RECORD read_record_info;
- int ret= -1;
- uint count= 0;
- bool clean_the_queue= TRUE;
-
- DBUG_ENTER("Event_queue::load_events_from_db");
- DBUG_PRINT("enter", ("thd=0x%lx", thd));
-
- if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
- {
- sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
- DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
- }
-
- init_read_record(&read_record_info, thd, table ,NULL,1,0);
- while (!(read_record_info.read_record(&read_record_info)))
- {
- Event_queue_element *et;
- if (!(et= new Event_queue_element))
- {
- DBUG_PRINT("info", ("Out of memory"));
- break;
- }
- DBUG_PRINT("info", ("Loading event from row."));
-
- if ((ret= et->load_from_row(table)))
- {
- sql_print_error("SCHEDULER: Error while loading from mysql.event. "
- "Table probably corrupted");
- break;
- }
- if (et->status != Event_queue_element::ENABLED)
- {
- DBUG_PRINT("info",("%s is disabled",et->name.str));
- delete et;
- continue;
- }
-
- /* let's find when to be executed */
- if (et->compute_next_execution_time())
- {
- sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
- " Skipping", et->dbname.str, et->name.str);
- continue;
- }
-
- {
- Event_job_data temp_job_data;
- DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
-
- temp_job_data.load_from_row(table);
-
- /*
- We load only on scheduler root just to check whether the body
- compiles.
- */
- switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
- case EVEX_MICROSECOND_UNSUP:
- sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
- "supported but found in mysql.event");
- break;
- case EVEX_COMPILE_ERROR:
- sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
- et->dbname.str, et->name.str);
- break;
- default:
- break;
- }
- thd->end_statement();
- thd->cleanup_after_query();
- }
- if (ret)
- {
- delete et;
- goto end;
- }
-
- DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list."));
- queue_insert_safe(&queue, (byte *) et);
- count++;
- }
- clean_the_queue= FALSE;
-end:
- end_read_record(&read_record_info);
-
- if (clean_the_queue)
- {
- empty_queue();
- ret= -1;
- }
- else
- {
- ret= 0;
- sql_print_information("SCHEDULER: Loaded %d event%s", count,
- (count == 1)?"":"s");
- }
-
- close_thread_tables(thd);
-
- DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
- DBUG_RETURN(ret);
}
--- 1.11/sql/event_queue.h 2006-09-05 17:08:10 +02:00
+++ 1.12/sql/event_queue.h 2006-09-05 17:08:10 +02:00
@@ -43,12 +43,12 @@ public:
/* Methods for queue management follow */
- int
- create_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
+ void
+ create_event(THD *thd, Event_queue_element *new_element);
- int
+ void
update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
- LEX_STRING *new_schema, LEX_STRING *new_name);
+ Event_queue_element *new_element);
void
drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
@@ -61,12 +61,12 @@ public:
bool
get_top_for_execution_if_time(THD *thd, Event_job_data **job_data);
+
bool
dump_internal_status(THD *thd);
- int
- load_events_from_db(THD *thd);
-
+ void
+ empty_queue();
protected:
void
find_n_remove_event(LEX_STRING db, LEX_STRING name);
@@ -76,8 +76,6 @@ protected:
drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*)(LEX_STRING, Event_basic *));
- void
- empty_queue();
void
dbug_dump_queue(time_t now);
--- 1.26/sql/event_scheduler.cc 2006-09-05 17:08:10 +02:00
+++ 1.27/sql/event_scheduler.cc 2006-09-05 17:08:10 +02:00
@@ -38,6 +38,7 @@ extern pthread_attr_t connection_attrib;
static
const LEX_STRING scheduler_states_names[] =
{
+ { C_STRING_WITH_LEN("UNINITIALIZED") },
{ C_STRING_WITH_LEN("INITIALIZED") },
{ C_STRING_WITH_LEN("RUNNING") },
{ C_STRING_WITH_LEN("STOPPING") }
@@ -440,8 +441,6 @@ bool
Event_scheduler::run(THD *thd)
{
int res= FALSE;
- struct timespec abstime;
- Event_job_data *job_data;
DBUG_ENTER("Event_scheduler::run");
sql_print_information("SCHEDULER: Manager thread started with id %lu",
@@ -454,6 +453,8 @@ Event_scheduler::run(THD *thd)
while (is_running())
{
+ Event_job_data *job_data;
+
/* Gets a minimized version */
if (queue->get_top_for_execution_if_time(thd, &job_data))
{
@@ -793,7 +794,7 @@ Event_scheduler::dump_internal_status(TH
/* thread_id */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("thread_id"), scs);
- if (thread_id)
+ if (scheduler_thd)
{
int_string.set((longlong) scheduler_thd->thread_id, scs);
protocol->store(&int_string);
| Thread |
|---|
| • bk commit into 5.1 tree (andrey:1.2290) | ahristov | 5 Sep |