Below is the list of changes that have just been committed into a local
5.1 repository of andrey. When andrey does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.2234 06/03/29 17:41:03 andrey@lmy004. +13 -0
Fix for bug#17619 (Scheduler race conditions)
This is preliminary patch intended to have early review in the fixing process. The issue is big
so this needs few revies during development.
sql/event_manager.h
1.1 06/03/29 17:40:52 andrey@lmy004. +143 -0
sql/event_manager.h
1.0 06/03/29 17:40:52 andrey@lmy004. +0 -0
BitKeeper file /work/mysql-5.1-bug17619-new/sql/event_manager.h
sql/event_manager.cc
1.1 06/03/29 17:40:51 andrey@lmy004. +1052 -0
sql/sql_yacc.yy
1.485 06/03/29 17:40:51 andrey@lmy004. +10 -0
- parse SHOW EVENTS TEST
sql/event_manager.cc
1.0 06/03/29 17:40:51 andrey@lmy004. +0 -0
BitKeeper file /work/mysql-5.1-bug17619-new/sql/event_manager.cc
sql/sql_parse.cc
1.535 06/03/29 17:40:50 andrey@lmy004. +10 -1
- run scheduler tests on SQLCOM_SHOW_EVENTS_TEST
- let kill_one_thread() return whether there was an error or not
sql/sql_lex.h
1.225 06/03/29 17:40:50 andrey@lmy004. +1 -1
add new command SQLCOM_SHOW_EVENTS_TEST, temporarily, for inside testing of the scheduler
sql/mysql_priv.h
1.388 06/03/29 17:40:50 andrey@lmy004. +1 -1
let kill_one_thread() return whether there was an error or not
sql/lex.h
1.157 06/03/29 17:40:50 andrey@lmy004. +1 -0
- add TEST as TEST_SYM. Needed for SHOW EVENTS TEST which is temporary code for testing
the scheduler from inside the server with 1 command.
sql/event_timed.cc
1.48 06/03/29 17:40:49 andrey@lmy004. +17 -0
Event_timed::kill_thread() stub, to implemented later, used by the
current Event_scheduler_manager code
sql/event_priv.h
1.21 06/03/29 17:40:49 andrey@lmy004. +6 -0
export db_find_event() and db_create_event()
sql/event_executor.cc
1.42 06/03/29 17:40:49 andrey@lmy004. +10 -1
- init and destroy the LOCK_manager mutex of Event_schedule_manager
sql/event.h
1.28 06/03/29 17:40:48 andrey@lmy004. +17 -4
move sortcmp_lex_string() so it is visible in the newly implemented
Event_timed::same_name()
sql/event.cc
1.38 06/03/29 17:40:48 andrey@lmy004. +2 -2
export db_create_event() and db_find_event()
sql/Makefile.am
1.134 06/03/29 17:40:48 andrey@lmy004. +2 -2
add a new file event_manager.cc to the build with supplementary event_manager.h
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: andrey
# Host: lmy004.
# Root: /work/mysql-5.1-bug17619-new
--- 1.133/sql/Makefile.am 2006-03-20 20:16:49 +01:00
+++ 1.134/sql/Makefile.am 2006-03-29 17:40:48 +02:00
@@ -65,7 +65,7 @@ noinst_HEADERS = item.h item_func.h item
sp_head.h sp_pcontext.h sp_rcontext.h sp.h sp_cache.h \
parse_file.h sql_view.h sql_trigger.h \
sql_array.h sql_cursor.h event.h event_priv.h \
- sql_plugin.h authors.h sql_partition.h \
+ event_manager.h sql_plugin.h authors.h sql_partition.h \
partition_info.h partition_element.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -100,7 +100,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.
tztime.cc my_time.c my_user.c my_decimal.cc\
sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \
sp_cache.cc parse_file.cc sql_trigger.cc \
- event_executor.cc event.cc event_timed.cc \
+ event_executor.cc event.cc event_timed.cc event_manager.cc\
sql_plugin.cc sql_binlog.cc \
handlerton.cc sql_tablespace.cc partition_info.cc
EXTRA_mysqld_SOURCES = ha_innodb.cc ha_berkeley.cc ha_archive.cc \
--- 1.156/sql/lex.h 2006-03-20 20:36:13 +01:00
+++ 1.157/sql/lex.h 2006-03-29 17:40:50 +02:00
@@ -521,6 +521,7 @@ static SYMBOL symbols[] = {
{ "TEMPORARY", SYM(TEMPORARY)},
{ "TEMPTABLE", SYM(TEMPTABLE_SYM)},
{ "TERMINATED", SYM(TERMINATED)},
+ { "TEST", SYM(TEST_SYM)},
{ "TEXT", SYM(TEXT_SYM)},
{ "THAN", SYM(THAN_SYM)},
{ "THEN", SYM(THEN_SYM)},
--- 1.387/sql/mysql_priv.h 2006-03-22 07:57:50 +01:00
+++ 1.388/sql/mysql_priv.h 2006-03-29 17:40:50 +02:00
@@ -83,7 +83,7 @@ char *sql_strmake_with_convert(const cha
CHARSET_INFO *from_cs,
uint32 max_res_length,
CHARSET_INFO *to_cs, uint32 *result_length);
-void kill_one_thread(THD *thd, ulong id, bool only_kill_query);
+uint kill_one_thread(THD *thd, ulong id, bool only_kill_query);
bool net_request_file(NET* net, const char* fname);
char* query_table_status(THD *thd,const char *db,const char *table_name);
--- 1.224/sql/sql_lex.h 2006-03-20 20:39:47 +01:00
+++ 1.225/sql/sql_lex.h 2006-03-29 17:40:50 +02:00
@@ -111,7 +111,7 @@ enum enum_sql_command {
SQLCOM_SHOW_AUTHORS, SQLCOM_BINLOG_BASE64_EVENT,
SQLCOM_SHOW_PLUGINS,
SQLCOM_CREATE_EVENT, SQLCOM_ALTER_EVENT, SQLCOM_DROP_EVENT,
- SQLCOM_SHOW_CREATE_EVENT, SQLCOM_SHOW_EVENTS,
+ SQLCOM_SHOW_CREATE_EVENT, SQLCOM_SHOW_EVENTS, SQLCOM_SHOW_EVENTS_TEST,
/* This should be the last !!! */
--- 1.534/sql/sql_parse.cc 2006-03-21 13:10:09 +01:00
+++ 1.535/sql/sql_parse.cc 2006-03-29 17:40:50 +02:00
@@ -3860,6 +3860,13 @@ end_with_restore_list:
res= evex_show_create_event(thd, lex->spname, lex->et->definer);
break;
}
+#ifndef DBUG_OFF
+ case SQLCOM_SHOW_EVENTS_TEST:
+ {
+ res= Event_scheduler_manager::run_tests(thd);
+ break;
+ }
+#endif
case SQLCOM_CREATE_FUNCTION: // UDF function
{
if (check_access(thd,INSERT_ACL,"mysql",0,1,0,0))
@@ -6872,7 +6879,7 @@ bool reload_acl_and_cache(THD *thd, ulon
This is written such that we have a short lock on LOCK_thread_count
*/
-void kill_one_thread(THD *thd, ulong id, bool only_kill_query)
+uint kill_one_thread(THD *thd, ulong id, bool only_kill_query)
{
THD *tmp;
uint error=ER_NO_SUCH_THREAD;
@@ -6906,6 +6913,8 @@ void kill_one_thread(THD *thd, ulong id,
send_ok(thd);
else
my_error(error, MYF(0), id);
+
+ return error;
}
--- 1.484/sql/sql_yacc.yy 2006-03-21 13:10:10 +01:00
+++ 1.485/sql/sql_yacc.yy 2006-03-29 17:40:51 +02:00
@@ -630,6 +630,7 @@ bool my_yyoverflow(short **a, YYSTYPE **
%token TEMPORARY
%token TEMPTABLE_SYM
%token TERMINATED
+%token TEST_SYM
%token TEXT_STRING
%token TEXT_SYM
%token TIMESTAMP
@@ -8149,6 +8150,15 @@ show_param:
lex->select_lex.db= $3;
if (prepare_schema_table(YYTHD, lex, 0, SCH_EVENTS))
YYABORT;
+ }
+ | EVENTS_SYM TEST_SYM
+ {
+#ifdef DBUG_OFF
+ yyerror(ER(ER_SYNTAX_ERROR));
+ YYABORT;
+#else
+ Lex->sql_command= SQLCOM_SHOW_EVENTS_TEST;
+#endif
}
| TABLE_SYM STATUS_SYM opt_db wild_and_where
{
--- 1.37/sql/event.cc 2006-02-28 20:32:30 +01:00
+++ 1.38/sql/event.cc 2006-03-29 17:40:48 +02:00
@@ -721,7 +721,7 @@ trunc_err:
db_update_event. The name of the event is inside "et".
*/
-static int
+int
db_create_event(THD *thd, Event_timed *et, my_bool create_if_not,
uint *rows_affected)
{
@@ -954,7 +954,7 @@ err:
2) tbl is not closed at exit
*/
-static int
+int
db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, Event_timed **ett,
TABLE *tbl, MEM_ROOT *root)
{
--- 1.27/sql/event.h 2006-03-16 13:14:32 +01:00
+++ 1.28/sql/event.h 2006-03-29 17:40:48 +02:00
@@ -19,6 +19,7 @@
#include "sp.h"
#include "sp_head.h"
+#include "event_manager.h"
#define EVEX_OK SP_OK
#define EVEX_KEY_NOT_FOUND SP_KEY_NOT_FOUND
@@ -75,6 +76,8 @@ enum evex_table_field
EVEX_FIELD_COUNT /* a cool trick to count the number of fields :) */
} ;
+int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs);
+
class Event_timed
{
Event_timed(const Event_timed &); /* Prevent use of these */
@@ -286,6 +289,20 @@ public:
delete sphead;
sphead= 0;
}
+
+ bool
+ same_name(Event_timed *etn)
+ {
+ return (!sortcmp_lex_string(etn->name, name, system_charset_info) &&
+ !sortcmp_lex_string(etn->dbname, dbname, system_charset_info) &&
+ !sortcmp_lex_string(etn->definer, definer, system_charset_info)
+ );
+ }
+
+ int
+ kill_thread();
+
+
protected:
bool
change_security_context(THD *thd, Security_context *s_ctx,
@@ -314,8 +331,6 @@ evex_open_event_table(THD *thd, enum thr
int
evex_show_create_event(THD *thd, sp_name *spn, LEX_STRING definer);
-int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs);
-
int
event_reconstruct_interval_expression(String *buf,
interval_type interval,
@@ -335,8 +350,6 @@ shutdown_events();
// auxiliary
int
event_timed_compare(Event_timed **a, Event_timed **b);
-
-
/*
CREATE TABLE event (
--- 1.41/sql/event_executor.cc 2006-03-01 04:21:57 +01:00
+++ 1.42/sql/event_executor.cc 2006-03-29 17:40:49 +02:00
@@ -17,6 +17,7 @@
#include "event_priv.h"
#include "event.h"
#include "sp.h"
+#include "event_manager.h"
#define WAIT_STATUS_READY 0
#define WAIT_STATUS_EMPTY_QUEUE 1
@@ -108,6 +109,7 @@ evex_init_mutexes()
return;
evex_mutexes_initted= TRUE;
+ pthread_mutex_init(&Event_scheduler_manager::LOCK_manager, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_event_arrays, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_workers_count, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_evex_running, MY_MUTEX_INIT_FAST);
@@ -237,10 +239,17 @@ shutdown_events()
evex_mutexes_initted= FALSE;
VOID(pthread_mutex_lock(&LOCK_evex_running));
VOID(pthread_mutex_unlock(&LOCK_evex_running));
-
+
+ if (Event_scheduler_manager::destroy())
+ {
+ DBUG_ASSERT(0);
+ sql_print_error("SCHEDULER: Trying to destroy while still running!");
+ }
pthread_mutex_destroy(&LOCK_event_arrays);
pthread_mutex_destroy(&LOCK_workers_count);
pthread_mutex_destroy(&LOCK_evex_running);
+ pthread_mutex_destroy(&Event_scheduler_manager::LOCK_manager);
+
}
DBUG_VOID_RETURN;
}
--- New file ---
+++ sql/event_manager.cc 06/03/29 17:40:51
/* Copyright (C) 2004-2005 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "event_priv.h"
#include "event.h"
/*
Todo:
1. Empty queue at the end of ::run()
*/
#define CHECK_RUNNING(__ret_code) \
if (mode != MANAGER_RUNNING) \
{ \
DBUG_PRINT("info", ("manager not running but %d. doing nothing", mode)); \
pthread_mutex_unlock(&LOCK_mutex); \
DBUG_RETURN(__ret_code); \
}
Event_scheduler_manager
*Event_scheduler_manager::ston= NULL;
pthread_mutex_t
Event_scheduler_manager::LOCK_manager;
/*
Inits an scheduler thread handler, both the main and a worker
SYNOPSIS
init_event_thread()
thd - the THD of the thread. Has to be allocated by the caller.
NOTES
1. The host of the thead is my_localhost
2. thd->net is initted with NULL - no communication.
Returns
0 OK
-1 Error
*/
static int
init_event_thread(THD* thd)
{
DBUG_ENTER("init_event_thread");
thd->client_capabilities= 0;
thd->security_ctx->master_access= 0;
thd->security_ctx->db_access= 0;
thd->security_ctx->host_or_ip= (char*)my_localhost;
my_net_init(&thd->net, 0);
thd->net.read_timeout = slave_net_timeout;
thd->slave_thread= 0;
thd->options|= OPTION_AUTO_IS_NULL;
thd->client_capabilities= CLIENT_LOCAL_FILES;
thd->real_id=pthread_self();
VOID(pthread_mutex_lock(&LOCK_thread_count));
thd->thread_id= thread_id++;
threads.append(thd);
thread_count++;
thread_running++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
if (init_thr_lock() || thd->store_globals())
{
thd->cleanup();
delete thd;
DBUG_RETURN(-1);
}
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
sigset_t set;
VOID(sigemptyset(&set)); // Get mask in use
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif
thd->proc_info= "Initialized";
thd->version= refresh_version;
thd->set_time();
DBUG_RETURN(0);
}
/*
Inits the main manager thread and then calls Event_scheduler_manager::run()
of arg.
SYNOPSIS
event_scheduler_run_proxy()
arg void* ptr to Event_scheduler_manager
NOTES
1. The host of the thead is my_localhost
2. thd->net is initted with NULL - no communication.
Returns
0 OK
-1 Error
*/
pthread_handler_t
event_scheduler_run_proxy(void *arg)
{
THD *thd= NULL; // needs to be first for thread_stack
Event_scheduler_manager *esm= (Event_scheduler_manager *) arg;
DBUG_ENTER("event_scheduler_run_proxy");
my_thread_init();
/* note that contructor of THD uses DBUG_ ! */
if (!(thd = new THD))
{
sql_print_error("SCHEDULER: Cannot create THD for the main thread.");
goto finish;
}
thd->thread_stack = (char*)&thd; // remember where our stack is
pthread_detach_this_thread();
if (init_event_thread(thd))
{
sql_print_error("SCHEDULER: Cannot init main event thread.");
goto finish;
}
/*
Guarantees that we will see the thread in SHOW PROCESSLIST though its
vio is NULL.
*/
thd->system_thread= 1;
thd->security_ctx->user= my_strdup("event_scheduler", MYF(0));
sql_print_information("SCHEDULER: Main thread started");
esm->run(thd);
/*
NOTE: Don't touch `esm` after this point because we have notified the
thread which shuts us down that we have finished cleaning. In this
very moment a new manager thread could be started and a crash is not
welcome.
*/
THD_CHECK_SENTRY(thd);
finish:
/*
If we cannot create THD then don't decrease because we haven't touched
thread_count and thread_running in init_event_thread() which was never
called. In init_event_thread() thread_count and thread_running are
always increased even in the case the method returns an error.
*/
if (thd)
{
pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
thread_running--;
pthread_mutex_unlock(&LOCK_thread_count);
thd->proc_info = "Clearing";
DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net);
THD_CHECK_SENTRY(thd);
delete thd;
}
my_thread_end();
pthread_exit(0);
DBUG_RETURN(0); // Can't return anything here
}
/*
Adds an event to the executor queue
Synopsis
Event_scheduler_manager::add_event()
et The event to add
Returns
FALSE OK
TRUE Failure
*/
Event_scheduler_manager*
Event_scheduler_manager::get_instance()
{
Event_scheduler_manager *tmp= NULL;
DBUG_ENTER("Event_scheduler_manager::get_instance");
if (sizeof(my_time_t) != sizeof(time_t))
{
sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
"The scheduler will not work correctly. Stopping.");
DBUG_ASSERT(0);
DBUG_RETURN(NULL);
}
pthread_mutex_lock(&LOCK_manager);
if (ston)
goto skip_init;
/* Check it rarely (when creating instance) but do check it */
check_system_tables();
tmp= new Event_scheduler_manager;
if (init_queue_ex(&tmp->queue, 30 /*num_el*/, 0 /*offset*/,
0 /*smallest_on_top*/, event_timed_compare_q,
NULL, 30 /*auto_extent*/))
{
sql_print_error("SCHEDULER: Insufficient memory to initialize "
"executing queue.");
goto err;
}
if (pthread_mutex_init(&tmp->LOCK_mutex, MY_MUTEX_INIT_FAST))
{
sql_print_error("SCHEDULER: Unable to initalize lock LOCK_mutex");
goto err;
}
if (pthread_cond_init(&tmp->COND_new_work, NULL))
{
sql_print_error("SCHEDULER: Unable to initialize COND_new_work");
goto err;
}
if (pthread_cond_init(&tmp->COND_started, NULL))
{
sql_print_error("SCHEDULER: Unable to initialize COND_started");
goto err;
}
if (pthread_cond_init(&tmp->COND_shutdown, NULL))
{
sql_print_error("SCHEDULER: Unable to initialize COND_shutdown");
goto err;
}
/* init memory root */
init_alloc_root(&tmp->manager_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
tmp->mode= MANAGER_INIT;
ston= tmp;
skip_init:
pthread_mutex_unlock(&LOCK_manager);
DBUG_RETURN(ston);
err:
pthread_mutex_unlock(&LOCK_manager);
delete tmp;
DBUG_RETURN(NULL);
}
/*
Adds an event to the executor queue
Synopsis
Event_scheduler_manager::add_event()
et The event to add
Returns
FALSE OK
TRUE Failure
*/
bool
Event_scheduler_manager::add_event(THD *thd, Event_timed *et,
bool check_existance)
{
Event_timed *et_new;
DBUG_ENTER("Event_scheduler_manager::add_event");
pthread_mutex_lock(&LOCK_mutex);
CHECK_RUNNING(false);
if (check_existance)
if (find_event(et, false))
{
pthread_mutex_unlock(&LOCK_mutex);
DBUG_RETURN(true);
}
/* We need to load the event on manager_root */
if ((et_new= load_and_compile_event(thd, et)))
{
queue_insert_safe(&queue, (byte *) et_new);
pthread_cond_signal(&COND_new_work);
}
pthread_mutex_unlock(&LOCK_mutex);
DBUG_RETURN(!et_new);
}
/*
Drops an event from the executor queue
Synopsis
Event_scheduler_manager::drop_event()
etn The event to drop
mode Wait the event or kill&drop
Returns
FALSE OK
TRUE Failure
*/
bool
Event_scheduler_manager::drop_event(THD *thd, Event_timed *etn,
enum event_drop_mode drop_mode)
{
Event_timed *et;
DBUG_ENTER("Event_scheduler_manager::drop_event");
pthread_mutex_lock(&LOCK_mutex);
CHECK_RUNNING(false);
if ((et= find_event(etn, true)))
et->kill_thread();
else
DBUG_PRINT("info", ("no such event found"));
pthread_mutex_unlock(&LOCK_mutex);
DBUG_RETURN(false);
}
/*
Replaces an event in the executor queue
Synopsis
Event_scheduler_manager::replace_event()
et The event to replace(add) into the queue
mode Wait the event or kill&drop
Returns
FALSE OK
TRUE Failure
*/
bool
Event_scheduler_manager::replace_event(THD *thd, Event_timed *et,
enum event_drop_mode replace_mode)
{
Event_timed *et_old, *et_new= NULL;
DBUG_ENTER("Event_scheduler_manager::replace_event");
pthread_mutex_lock(&LOCK_mutex);
CHECK_RUNNING(false);
if ((et_old= find_event(et, true)))
et_old->kill_thread();
else
DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED",
et->dbname.str, et->name.str));
/* We need to load the event on manager_root */
if ((et_new= load_and_compile_event(thd, et)))
{
queue_insert_safe(&queue, (byte *) et_new);
pthread_cond_signal(&COND_new_work);
}
pthread_mutex_unlock(&LOCK_mutex);
DBUG_RETURN(!et_new);
}
/*
Searches for an event in the executor queue
Synopsis
Event_scheduler_manager::find_event()
etn The event to find
remove_from_q If found whether to remove from the Q
Returns
NULL Not found
otherwise Address
*/
Event_timed *
Event_scheduler_manager::find_event(Event_timed *etn, bool remove_from_q)
{
uint i;
DBUG_ENTER("Event_scheduler_manager::find_event");
for (i= 0; i < queue.elements; ++i)
{
Event_timed *et= (Event_timed *) queue_element(&queue, i);
DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", etn->dbname.str, etn->name.str,
et->dbname.str, et->name.str));
if (etn->same_name(et))
{
if (remove_from_q)
queue_remove(&queue, i);
DBUG_RETURN(et);
}
}
DBUG_RETURN(NULL);
}
extern pthread_attr_t connection_attrib;
/*
Starts the event queue manager
Synopsis
Event_scheduler_manager::start()
Returns
EVENT_OP_OK OK
EVENT_OP_CANTFORK Cannot create a new thread
EVENT_OP_CANTSTART ::run() had problem booting
EVENT_OP_ALREADY_RUNNING Manager already running
*/
enum Event_scheduler_manager::event_manager_op_code
Event_scheduler_manager::start()
{
enum event_manager_op_code ret;
pthread_t th;
DBUG_ENTER("Event_scheduler_manager::start");
pthread_mutex_lock(&LOCK_mutex);
/* If already working or starting don't make another attempt */
if (mode > MANAGER_INIT)
{
DBUG_PRINT("info", ("manager is already running or starting"));
ret= EVENT_OP_ALREADY_RUNNING;
goto finish;
}
/*
Now if another thread calls start it will bail-out because the branch
above will be executed. Thus no two or more child threads will be forked.
We the child thread cannot start for some reason then `mode` is set
to MANAGER_CANTSTART and COND_started is also signaled. In this case we
set `mode` back to MANAGER_INIT so another attempt to start the manager
can be made.
*/
mode= MANAGER_STARTING;
/* Fork */
if (pthread_create(&th, &connection_attrib, event_scheduler_run_proxy,
(void*)this))
{
DBUG_PRINT("error", ("cannot create a new thread"));
mode= MANAGER_INIT;
ret= EVENT_OP_CANTFORK;
goto finish;
}
/* Wait till the child thread has booted (w/ or wo success) */
while (mode != MANAGER_RUNNING && mode != MANAGER_CANTSTART)
pthread_cond_wait(&COND_started, &LOCK_mutex);
/*
If we cannot start for some reason then don't prohibit further attempts.
Set back to MANAGER_INIT.
*/
if (mode == MANAGER_CANTSTART)
{
mode= MANAGER_INIT;
ret= EVENT_OP_CANTSTART;
goto finish;
}
ret= EVENT_OP_OK;
finish:
pthread_mutex_unlock(&LOCK_mutex);
DBUG_RETURN(ret);
}
/*
The internal loop of the event queue manager
Synopsis
Event_scheduler_manager::run()
Returns
FALSE OK
TRUE Failure
*/
bool
Event_scheduler_manager::run(THD *thd)
{
int ret;
struct timespec abstime;
DBUG_ENTER("Event_scheduler_manager::run");
/*
Load events from disk. `mode` is currently MANAGER_STARTING. Hence,
there is no race between loading and add_event() because the latter
won't touch the queue until the mode is not MANAGER_RUNNING, which is
set a bit later in this function.
*/
ret= load_events_from_db(thd);
pthread_mutex_lock(&LOCK_mutex);
if (!ret)
{
thread_id= thd->thread_id;
sql_print_information("SCHEDULER: Thread_id %d", thread_id);
mode= MANAGER_RUNNING;
}
else
mode= MANAGER_CANTSTART;
pthread_cond_signal(&COND_started);
pthread_mutex_unlock(&LOCK_mutex);
if (ret)
DBUG_RETURN(true);
abstime.tv_nsec= 0;
while (!thd->killed)
{
TIME time_next;
int t2sleep;
Event_timed *et;
my_bool tmp;
pthread_mutex_lock(&LOCK_mutex);
while (!queue.elements)
pthread_cond_wait(&COND_new_work, &LOCK_mutex);
et= (Event_timed *)queue_top(&queue);
DBUG_PRINT("evex main thread",("computing time to sleep till next exec"));
/* This timestamp is in UTC */
abstime.tv_sec= (time_t)TIME_to_timestamp(thd, &et->execute_at, &tmp);
/* Convert the UTC timestamp to the local timezone*/
my_tz_SYSTEM->gmt_sec_to_TIME(&time_next, abstime.tv_sec);
abstime.tv_sec= (time_t) TIME_to_timestamp(thd, &time_next, &tmp);
thd->end_time();
if (abstime.tv_sec > thd->query_start())
{
DBUG_PRINT("info", ("Going to sleep"));
pthread_cond_timedwait(&COND_new_work, &LOCK_mutex, &abstime);
}
else
{
/* Execute the event */
}
pthread_mutex_unlock(&LOCK_mutex);
}
sql_print_information("SCHEDULER: Emptying the queue.");
/* empty the queue */
/* free mamager_root memory but don't destroy the root */
/*
We notify the waiting thread which shutdowns us that we have cleaned.
There are few more instructions to be executed in this pthread but
they don't affect manager structures thus it's safe to signal already
at this point.
*/
pthread_mutex_lock(&LOCK_mutex);
/*
Don't mode set to MANAGER_STOPPED because ::start() checks
(mode > MANAGER_INIT) to decide whether it can fork a new thread or return.
MANAGER_STOPPED is used by the dtor of Event_scheduler_manager, thus
preventing a fork of a new thread while the manager object is being
destroyed.
*/
mode= MANAGER_INIT;
sql_print_information("SCHEDULER: Stopped.");
pthread_cond_signal(&COND_shutdown);
pthread_mutex_unlock(&LOCK_mutex);
DBUG_RETURN(false);
}
/*
Shutdowns the event queue manager
Synopsis
Event_scheduler_manager::shutdown()
mode Wait the events or kill them immediately
Returns
EVENT_OP_OK OK
EVENT_OP_CANT_KILL Error during stopping of manager thread
EVENT_OP_NOT_RUNNING Manager not working
*/
enum Event_scheduler_manager::event_manager_op_code
Event_scheduler_manager::shutdown(enum event_drop_mode stop_mode)
{
int ret;
THD *thd= current_thd;
DBUG_ENTER("Event_scheduler_manager::shutdown");
pthread_mutex_lock(&LOCK_mutex);
CHECK_RUNNING(EVENT_OP_NOT_RUNNING);
mode= MANAGER_SHUTDOWN;
if ((ret= kill_one_thread(thd, thread_id, false)))
{
DBUG_PRINT("error", ("Error while killing thread %d errcode=%d",
thread_id, ret));
pthread_mutex_unlock(&LOCK_mutex);
DBUG_RETURN(EVENT_OP_CANT_KILL);
}
/* Guarantee we don't catch spurious signals */
while (mode != MANAGER_INIT)
pthread_cond_wait(&COND_shutdown, &LOCK_mutex);
DBUG_PRINT("info", ("Manager thread has cleaned up. Set mode to INIT"));
pthread_mutex_unlock(&LOCK_mutex);
DBUG_RETURN(EVENT_OP_OK);
}
/*
Returns the number of elements in the queue
Synopsis
Event_scheduler_manager::events_count()
Returns
-1 Manager not working
>=0 Number of Event_timed objects in the queue
*/
int
Event_scheduler_manager::events_count()
{
int n;
DBUG_ENTER("Event_scheduler_manager::events_count");
pthread_mutex_lock(&LOCK_manager);
n= (mode == MANAGER_RUNNING)? queue.elements:-1;
pthread_mutex_unlock(&LOCK_manager);
DBUG_RETURN(n);
}
/*
Looks for a named event in mysql.event and then loads it from
the table, compiles and inserts it into the cache.
SYNOPSIS
Event_scheduler_manager::load_and_compile_event()
thd THD
etn The name of the event to load and compile on manager's root
RETURN VALUE
NULL Error during compile
otherwise Address
*/
Event_timed *
Event_scheduler_manager::load_and_compile_event(THD *thd, Event_timed *etn)
{
int ret= 0;
MEM_ROOT *tmp_mem_root;
Event_timed *et_new;
Open_tables_state backup;
DBUG_ENTER("Event_scheduler_manager::load_and_compile_event");
DBUG_PRINT("enter", ("name: %s", etn->name.str));
thd->reset_n_backup_open_tables_state(&backup);
/* No need to use my_error() here because db_find_event() has done it */
{
sp_name spn(etn->dbname, etn->name);
ret= db_find_event(thd, &spn, &etn->definer, &et_new, NULL, &manager_root);
}
thd->restore_backup_open_tables_state(&backup);
if (ret)
DBUG_RETURN(NULL);
tmp_mem_root= thd->mem_root;
thd->mem_root= &manager_root;
/*
Allocate on manager_root. If you call without manager_root then sphead
will not be cleared!
*/
if (!(ret= et_new->compile(thd, &manager_root)))
et_new->compute_next_execution_time();
thd->mem_root= tmp_mem_root;
if (ret)
{
delete et_new;
et_new= NULL;
}
DBUG_RETURN(et_new);
}
/*
Loads all ENABLED events from mysql.event into the prioritized
queue. Called during scheduler main thread initialization. Compiles
the events. Creates Event_timed instances for every ENABLED event
from mysql.event.
SYNOPSIS
Event_scheduler_manager::load_events_from_db()
thd - Thread context. Used for memory allocation in some cases.
RETURNS
0 OK
!0 Error
NOTES
Reports the error to the console
*/
int
Event_scheduler_manager::load_events_from_db(THD *thd)
{
TABLE *table;
READ_RECORD read_record_info;
int ret= -1;
uint count= 0;
DBUG_ENTER("evex_load_events_from_db");
pthread_mutex_lock(&LOCK_mutex);
if (mode > MANAGER_STARTING)
{
DBUG_ASSERT(0);
sql_print_error("SCHEDULER: Trying to load events while already running.");
pthread_mutex_unlock(&LOCK_mutex);
DBUG_RETURN(EVEX_GENERAL_ERROR);
}
pthread_mutex_unlock(&LOCK_mutex);
if ((ret= evex_open_event_table(thd, TL_READ, &table)))
{
sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open.");
DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
}
init_read_record(&read_record_info, thd, table ,NULL,1,0);
while (!(read_record_info.read_record(&read_record_info)))
{
Event_timed *et;
if (!(et= new Event_timed))
{
DBUG_PRINT("load_events_from_db", ("Out of memory"));
ret= -1;
goto end;
}
DBUG_PRINT("load_events_from_db", ("Loading event from row."));
if ((ret= et->load_from_row(&manager_root, table)))
{
sql_print_error("SCHEDULER: Error while loading from mysql.event. "
"Table probably corrupted");
goto end;
}
if (et->status != MYSQL_EVENT_ENABLED)
{
DBUG_PRINT("load_events_from_db",("%s is disabled",et->name.str));
delete et;
continue;
}
DBUG_PRINT("load_events_from_db",
("Event %s loaded from row. Time to compile", et->name.str));
switch (ret= et->compile(thd, &manager_root)) {
case EVEX_MICROSECOND_UNSUP:
sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
"supported but found in mysql.event");
goto end;
case EVEX_COMPILE_ERROR:
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
et->dbname.str, et->name.str);
goto end;
default:
break;
}
/* let's find when to be executed */
if (et->compute_next_execution_time())
{
sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
" Skipping", et->dbname.str, et->name.str);
continue;
}
DBUG_PRINT("load_events_from_db", ("Adding to the exec list."));
queue_insert_safe(&queue, (byte *) et);
DBUG_PRINT("load_events_from_db", ("%p %*s",
et, et->name.length,et->name.str));
count++;
}
ret= 0;
end:
end_read_record(&read_record_info);
/* Force close to free memory */
thd->version--;
close_thread_tables(thd);
if (!ret)
sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s");
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
DBUG_RETURN(ret);
}
/*
Opens mysql.db and mysql.user and checks whether:
1. mysql.db has column Event_priv at column 20 (0 based);
2. mysql.user has column Event_priv at column 29 (0 based);
Synopsis
Event_scheduler_manager::evex_check_system_tables()
*/
void
Event_scheduler_manager::check_system_tables()
{
THD *thd= current_thd;
TABLE_LIST tables;
bool not_used;
Open_tables_state backup;
/* thd is 0x0 during boot of the server. Later it's !=0x0 */
if (!thd)
return;
thd->reset_n_backup_open_tables_state(&backup);
bzero((char*) &tables, sizeof(tables));
tables.db= (char*) "mysql";
tables.table_name= tables.alias= (char*) "db";
tables.lock_type= TL_READ;
if (simple_open_n_lock_tables(thd, &tables))
sql_print_error("Cannot open mysql.db");
else
{
table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, mysql_db_table_fields,
&mysql_db_table_last_check,ER_CANNOT_LOAD_FROM_TABLE);
close_thread_tables(thd);
}
bzero((char*) &tables, sizeof(tables));
tables.db= (char*) "mysql";
tables.table_name= tables.alias= (char*) "user";
tables.lock_type= TL_READ;
if (simple_open_n_lock_tables(thd, &tables))
sql_print_error("Cannot open mysql.db");
else
{
if (tables.table->s->fields < 29 ||
strncmp(tables.table->field[29]->field_name,
STRING_WITH_LEN("Event_priv")))
sql_print_error("mysql.user has no `Event_priv` column at position 29");
close_thread_tables(thd);
}
thd->restore_backup_open_tables_state(&backup);
}
#ifndef DBUG_OFF
#define ERROR_W_MESSAGE(cond, msg) \
{ \
sql_print_information("Testing %s", msg); \
DBUG_PRINT("info",("Testing %s", msg)); \
protocol->prepare_for_resend(); \
protocol->store(msg, strlen(msg), system_charset_info); \
if (!(cond)){ \
protocol->store((char*)STRING_WITH_LEN("ERROR"), system_charset_info); \
ret= protocol->write(); \
goto err; \
} else {\
protocol->store((char*)STRING_WITH_LEN("OK"), system_charset_info); \
ret= protocol->write(); \
if (ret) \
goto err; \
} \
}
bool
Event_scheduler_manager::run_tests(THD *thd)
{
Protocol *protocol= thd->protocol;
Event_scheduler_manager *mgr;
List<Item> field_list;
int ret;
DBUG_ENTER("evex_manager_tester");
field_list.push_back(new Item_empty_string("Test", 100));
field_list.push_back(new Item_empty_string("Result", 5));
if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS |
Protocol::SEND_EOF))
DBUG_RETURN(true);
Event_timed et;
et.dbname.str= (char *)"test";
et.dbname.length= strlen("test");
et.name.str= (char *)"event_name";
et.name.length= strlen("event_name");
et.definer.str= (char *)"definer@host";
et.definer.length= strlen("definer@host");
et.body.str= (char *) "select 1 from dual";
et.body.length= strlen("select 1 from dual");
et.expression= 1;
et.interval= INTERVAL_MINUTE;
Event_timed et2;
et2.dbname.str= (char *)"test";
et2.dbname.length= strlen("test");
et2.name.str= (char *)"event_name2";
et2.name.length= strlen("event_name2");
et2.definer.str= (char *)"definer@host";
et2.definer.length= strlen("definer@host");
et2.body.str= (char *) "select 2 from dual";
et2.body.length= strlen("select 2 from dual");
et2.expression= 2;
et2.interval= INTERVAL_HOUR;
uint tmp;
/* Force new manager instantiation */
ERROR_W_MESSAGE(!Event_scheduler_manager::destroy(), "Destroy old manager");
ERROR_W_MESSAGE(!Event_scheduler_manager::ston, "Old manager");
mgr= Event_scheduler_manager::get_instance();
ERROR_W_MESSAGE(mgr, "Create new manager");
ERROR_W_MESSAGE((mgr->mode == Event_scheduler_manager::MANAGER_INIT),
"Test manager mode ");
ERROR_W_MESSAGE(mgr->events_count() == -1, "Test count");
ERROR_W_MESSAGE(!db_create_event(thd, &et, false, &tmp),
"Create event on disk");
ERROR_W_MESSAGE(!mgr->start(), "Start manager");
mgr->add_event(thd, &et, false);
ERROR_W_MESSAGE(mgr->events_count() == 2, "Test count wo check");
mgr->add_event(thd, &et, true);
ERROR_W_MESSAGE(mgr->events_count() == 2, "Test count with check");
mgr->add_event(thd, &et, false);
ERROR_W_MESSAGE(mgr->events_count() == 3, "Test count wo check");
mgr->add_event(thd, &et2, true);
ERROR_W_MESSAGE(mgr->events_count() == 3, "Test count after add of et2");
ERROR_W_MESSAGE(!db_create_event(thd, &et2, false, &tmp),
"Create event et2 on disk");
mgr->add_event(thd, &et2, true);
ERROR_W_MESSAGE(mgr->events_count() == 4,
"Test count after add of et2 (again)");
mgr->drop_event(thd, &et, Event_scheduler_manager::EVENT_STOP_IMMED);
ERROR_W_MESSAGE(mgr->events_count() == 3, "Test delete 1");
mgr->drop_event(thd, &et, Event_scheduler_manager::EVENT_STOP_IMMED);
ERROR_W_MESSAGE(mgr->events_count() == 2, "Test delete 2");
mgr->drop_event(thd, &et, Event_scheduler_manager::EVENT_STOP_IMMED);
ERROR_W_MESSAGE(mgr->events_count() == 1, "Test delete 3");
et2.body.str= (char *) "select 212 from dual";
et2.body.length= strlen("select 212 from dual");
mgr->replace_event(thd, &et2, Event_scheduler_manager::EVENT_STOP_IMMED);
ERROR_W_MESSAGE(mgr->events_count() == 1, "Test delete 4");
/*
Here we will get an empty line because kill_one_thread() which is
used by ::shutdown() calls send_eof().
*/
ERROR_W_MESSAGE(!mgr->shutdown(EVENT_STOP_IMMED), "Shutdown the manager");
ERROR_W_MESSAGE(!mgr->start(), "Start manager again and sleep 2s");
ERROR_W_MESSAGE(mgr->start()==
Event_scheduler_manager::EVENT_OP_ALREADY_RUNNING,
"Try to start again");
/* now there should be 2 events */
sleep(15);
/*
Here we will get an empty line because kill_one_thread() which is
used by ::shutdown() calls send_eof().
*/
ERROR_W_MESSAGE(!mgr->shutdown(EVENT_STOP_IMMED), "Shutdown manager");
ERROR_W_MESSAGE(mgr->shutdown(EVENT_STOP_IMMED)==
Event_scheduler_manager::EVENT_OP_NOT_RUNNING,
"Shutdown manager again for checking");
ERROR_W_MESSAGE(!Event_scheduler_manager::destroy(), "Destroy new manager");
ERROR_W_MESSAGE(!Event_scheduler_manager::ston, "Singleton");
ERROR_W_MESSAGE(!et.drop(thd), "et.drop()");
ERROR_W_MESSAGE(!et2.drop(thd), "et2.drop()");
ERROR_W_MESSAGE(TRUE, "All Tests");
send_eof(thd);
DBUG_RETURN(false);
err:
mgr->shutdown(EVENT_STOP_IMMED);
Event_scheduler_manager::destroy();
send_eof(thd);
DBUG_RETURN(true);
}
#endif
--- New file ---
+++ sql/event_manager.h 06/03/29 17:40:52
#ifndef _EVENT_MANAGER_H_
#define _EVENT_MANAGER_H_
class THD;
class Event_scheduler_manager
{
public:
static pthread_mutex_t LOCK_manager;
enum event_drop_mode
{
EVENT_STOP_IMMED = 1,
EVENT_STOP_WAIT =2
};
enum event_manager_mode
{
MANAGER_INIT= 0,
MANAGER_STARTING,
MANAGER_CANTSTART,
MANAGER_RUNNING,
MANAGER_SHUTDOWN,
MANAGER_STOPPED,
};
enum event_manager_op_code
{
EVENT_OP_OK = 0,
EVENT_OP_CANTFORK,
EVENT_OP_CANTSTART,
EVENT_OP_ALREADY_RUNNING,
EVENT_OP_NOT_RUNNING,
EVENT_OP_CANT_KILL
};
static Event_scheduler_manager*
get_instance();
bool
add_event(THD *thd, Event_timed *et, bool check_existance);
bool
drop_event(THD *thd, Event_timed *etn, enum event_drop_mode drop_mode);
bool
replace_event(THD *thd, Event_timed *et, enum event_drop_mode replace_mode);
/*
LOCKING PROTOCOL: Does not do any locking. You are responsible to lock
*/
Event_timed *
find_event(Event_timed *etn, bool remove_from_q);
bool
trigger_event(THD *thd, Event_timed *etn) { DBUG_ASSERT(0);}
int
events_count();
enum event_manager_op_code
start();
bool
run(THD *thd);
enum event_manager_op_code
shutdown(enum event_drop_mode stop_mode);
/*
This method is pretty dangerous and we use it only for
testing purposes.
*/
static bool
destroy()
{
pthread_mutex_lock(&LOCK_manager);
if (ston && ston->mode == MANAGER_RUNNING)
{
/* the master thread should not be running */
DBUG_ASSERT(0);
return true;
}
delete ston;
ston= NULL;
pthread_mutex_unlock(&LOCK_manager);
return false;
}
#ifndef DBUG_OFF
static bool
run_tests(THD *thd);
#endif
protected:
/* helper functions */
static void
check_system_tables();
Event_timed *
load_and_compile_event(THD *thd, Event_timed *etn);
int
load_events_from_db(THD *thd);
private:
/* Prevent use of these */
Event_scheduler_manager(const Event_scheduler_manager &);
void operator=(Event_scheduler_manager &);
Event_scheduler_manager():thread_id(0)
{}
/*
LOCK PROCOTOL: Expects that this method is synchronized
*/
~Event_scheduler_manager()
{
/* ToDo: Wait if there are elements on the queue */
pthread_mutex_lock(&LOCK_mutex);
ston->mode= MANAGER_SHUTDOWN;
pthread_mutex_unlock(&LOCK_mutex);
delete_queue(&queue);
pthread_cond_destroy(&COND_new_work);
pthread_cond_destroy(&COND_started);
mode= MANAGER_STOPPED;
}
static Event_scheduler_manager *ston;
enum event_manager_mode mode;
pthread_mutex_t LOCK_mutex;
ulonglong thread_id;
pthread_cond_t COND_new_work;
pthread_cond_t COND_started;
pthread_cond_t COND_shutdown;
MEM_ROOT manager_root;
QUEUE queue;
};
#endif /* _EVENT_MANAGER_H_ */
--- 1.20/sql/event_priv.h 2006-02-28 18:33:25 +01:00
+++ 1.21/sql/event_priv.h 2006-03-29 17:40:49 +02:00
@@ -46,7 +46,13 @@ event_timed_compare_q(void *vptr, byte*
int db_drop_event(THD *thd, Event_timed *et, bool drop_if_exists,
uint *rows_affected);
+int
+db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, Event_timed **ett,
+ TABLE *tbl, MEM_ROOT *root);
+int
+db_create_event(THD *thd, Event_timed *et, my_bool create_if_not,
+ uint *rows_affected);
#define EXEC_QUEUE_QUEUE_NAME executing_queue
#define EXEC_QUEUE_DARR_NAME evex_executing_queue
--- 1.47/sql/event_timed.cc 2006-03-17 09:36:32 +01:00
+++ 1.48/sql/event_timed.cc 2006-03-29 17:40:49 +02:00
@@ -1546,3 +1546,20 @@ Event_timed::spawn_unlock(THD *thd)
VOID(pthread_mutex_unlock(&this->LOCK_running));
return ret;
}
+
+
+/*
+ Kills a running event
+ Synopsis
+ Event_timed::kill_thread()
+
+ Returns
+ 0 OK
+ !0 Error
+*/
+
+int
+Event_timed::kill_thread()
+{
+ return false;
+}
| Thread |
|---|
| • bk commit into 5.1 tree (andrey:1.2234) BUG#17619 | ahristov | 29 Mar |