From: Date: March 29 2006 5:41pm Subject: bk commit into 5.1 tree (andrey:1.2234) BUG#17619 List-Archive: http://lists.mysql.com/commits/4281 X-Bug: 17619 Message-Id: <20060329154118.1167D2A6F0@andrey.hristov.com> Below is the list of changes that have just been committed into a local 5.1 repository of andrey. When andrey does a push these changes will be propagated to the main repository and, within 24 hours after the push, to the public repository. For information on how to access the public repository see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html ChangeSet 1.2234 06/03/29 17:41:03 andrey@lmy004. +13 -0 Fix for bug#17619 (Scheduler race conditions) This is preliminary patch intended to have early review in the fixing process. The issue is big so this needs few revies during development. sql/event_manager.h 1.1 06/03/29 17:40:52 andrey@lmy004. +143 -0 sql/event_manager.h 1.0 06/03/29 17:40:52 andrey@lmy004. +0 -0 BitKeeper file /work/mysql-5.1-bug17619-new/sql/event_manager.h sql/event_manager.cc 1.1 06/03/29 17:40:51 andrey@lmy004. +1052 -0 sql/sql_yacc.yy 1.485 06/03/29 17:40:51 andrey@lmy004. +10 -0 - parse SHOW EVENTS TEST sql/event_manager.cc 1.0 06/03/29 17:40:51 andrey@lmy004. +0 -0 BitKeeper file /work/mysql-5.1-bug17619-new/sql/event_manager.cc sql/sql_parse.cc 1.535 06/03/29 17:40:50 andrey@lmy004. +10 -1 - run scheduler tests on SQLCOM_SHOW_EVENTS_TEST - let kill_one_thread() return whether there was an error or not sql/sql_lex.h 1.225 06/03/29 17:40:50 andrey@lmy004. +1 -1 add new command SQLCOM_SHOW_EVENTS_TEST, temporarily, for inside testing of the scheduler sql/mysql_priv.h 1.388 06/03/29 17:40:50 andrey@lmy004. +1 -1 let kill_one_thread() return whether there was an error or not sql/lex.h 1.157 06/03/29 17:40:50 andrey@lmy004. +1 -0 - add TEST as TEST_SYM. Needed for SHOW EVENTS TEST which is temporary code for testing the scheduler from inside the server with 1 command. sql/event_timed.cc 1.48 06/03/29 17:40:49 andrey@lmy004. +17 -0 Event_timed::kill_thread() stub, to implemented later, used by the current Event_scheduler_manager code sql/event_priv.h 1.21 06/03/29 17:40:49 andrey@lmy004. +6 -0 export db_find_event() and db_create_event() sql/event_executor.cc 1.42 06/03/29 17:40:49 andrey@lmy004. +10 -1 - init and destroy the LOCK_manager mutex of Event_schedule_manager sql/event.h 1.28 06/03/29 17:40:48 andrey@lmy004. +17 -4 move sortcmp_lex_string() so it is visible in the newly implemented Event_timed::same_name() sql/event.cc 1.38 06/03/29 17:40:48 andrey@lmy004. +2 -2 export db_create_event() and db_find_event() sql/Makefile.am 1.134 06/03/29 17:40:48 andrey@lmy004. +2 -2 add a new file event_manager.cc to the build with supplementary event_manager.h # This is a BitKeeper patch. What follows are the unified diffs for the # set of deltas contained in the patch. The rest of the patch, the part # that BitKeeper cares about, is below these diffs. # User: andrey # Host: lmy004. # Root: /work/mysql-5.1-bug17619-new --- 1.133/sql/Makefile.am 2006-03-20 20:16:49 +01:00 +++ 1.134/sql/Makefile.am 2006-03-29 17:40:48 +02:00 @@ -65,7 +65,7 @@ noinst_HEADERS = item.h item_func.h item sp_head.h sp_pcontext.h sp_rcontext.h sp.h sp_cache.h \ parse_file.h sql_view.h sql_trigger.h \ sql_array.h sql_cursor.h event.h event_priv.h \ - sql_plugin.h authors.h sql_partition.h \ + event_manager.h sql_plugin.h authors.h sql_partition.h \ partition_info.h partition_element.h mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ item.cc item_sum.cc item_buff.cc item_func.cc \ @@ -100,7 +100,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler. tztime.cc my_time.c my_user.c my_decimal.cc\ sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \ sp_cache.cc parse_file.cc sql_trigger.cc \ - event_executor.cc event.cc event_timed.cc \ + event_executor.cc event.cc event_timed.cc event_manager.cc\ sql_plugin.cc sql_binlog.cc \ handlerton.cc sql_tablespace.cc partition_info.cc EXTRA_mysqld_SOURCES = ha_innodb.cc ha_berkeley.cc ha_archive.cc \ --- 1.156/sql/lex.h 2006-03-20 20:36:13 +01:00 +++ 1.157/sql/lex.h 2006-03-29 17:40:50 +02:00 @@ -521,6 +521,7 @@ static SYMBOL symbols[] = { { "TEMPORARY", SYM(TEMPORARY)}, { "TEMPTABLE", SYM(TEMPTABLE_SYM)}, { "TERMINATED", SYM(TERMINATED)}, + { "TEST", SYM(TEST_SYM)}, { "TEXT", SYM(TEXT_SYM)}, { "THAN", SYM(THAN_SYM)}, { "THEN", SYM(THEN_SYM)}, --- 1.387/sql/mysql_priv.h 2006-03-22 07:57:50 +01:00 +++ 1.388/sql/mysql_priv.h 2006-03-29 17:40:50 +02:00 @@ -83,7 +83,7 @@ char *sql_strmake_with_convert(const cha CHARSET_INFO *from_cs, uint32 max_res_length, CHARSET_INFO *to_cs, uint32 *result_length); -void kill_one_thread(THD *thd, ulong id, bool only_kill_query); +uint kill_one_thread(THD *thd, ulong id, bool only_kill_query); bool net_request_file(NET* net, const char* fname); char* query_table_status(THD *thd,const char *db,const char *table_name); --- 1.224/sql/sql_lex.h 2006-03-20 20:39:47 +01:00 +++ 1.225/sql/sql_lex.h 2006-03-29 17:40:50 +02:00 @@ -111,7 +111,7 @@ enum enum_sql_command { SQLCOM_SHOW_AUTHORS, SQLCOM_BINLOG_BASE64_EVENT, SQLCOM_SHOW_PLUGINS, SQLCOM_CREATE_EVENT, SQLCOM_ALTER_EVENT, SQLCOM_DROP_EVENT, - SQLCOM_SHOW_CREATE_EVENT, SQLCOM_SHOW_EVENTS, + SQLCOM_SHOW_CREATE_EVENT, SQLCOM_SHOW_EVENTS, SQLCOM_SHOW_EVENTS_TEST, /* This should be the last !!! */ --- 1.534/sql/sql_parse.cc 2006-03-21 13:10:09 +01:00 +++ 1.535/sql/sql_parse.cc 2006-03-29 17:40:50 +02:00 @@ -3860,6 +3860,13 @@ end_with_restore_list: res= evex_show_create_event(thd, lex->spname, lex->et->definer); break; } +#ifndef DBUG_OFF + case SQLCOM_SHOW_EVENTS_TEST: + { + res= Event_scheduler_manager::run_tests(thd); + break; + } +#endif case SQLCOM_CREATE_FUNCTION: // UDF function { if (check_access(thd,INSERT_ACL,"mysql",0,1,0,0)) @@ -6872,7 +6879,7 @@ bool reload_acl_and_cache(THD *thd, ulon This is written such that we have a short lock on LOCK_thread_count */ -void kill_one_thread(THD *thd, ulong id, bool only_kill_query) +uint kill_one_thread(THD *thd, ulong id, bool only_kill_query) { THD *tmp; uint error=ER_NO_SUCH_THREAD; @@ -6906,6 +6913,8 @@ void kill_one_thread(THD *thd, ulong id, send_ok(thd); else my_error(error, MYF(0), id); + + return error; } --- 1.484/sql/sql_yacc.yy 2006-03-21 13:10:10 +01:00 +++ 1.485/sql/sql_yacc.yy 2006-03-29 17:40:51 +02:00 @@ -630,6 +630,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** %token TEMPORARY %token TEMPTABLE_SYM %token TERMINATED +%token TEST_SYM %token TEXT_STRING %token TEXT_SYM %token TIMESTAMP @@ -8149,6 +8150,15 @@ show_param: lex->select_lex.db= $3; if (prepare_schema_table(YYTHD, lex, 0, SCH_EVENTS)) YYABORT; + } + | EVENTS_SYM TEST_SYM + { +#ifdef DBUG_OFF + yyerror(ER(ER_SYNTAX_ERROR)); + YYABORT; +#else + Lex->sql_command= SQLCOM_SHOW_EVENTS_TEST; +#endif } | TABLE_SYM STATUS_SYM opt_db wild_and_where { --- 1.37/sql/event.cc 2006-02-28 20:32:30 +01:00 +++ 1.38/sql/event.cc 2006-03-29 17:40:48 +02:00 @@ -721,7 +721,7 @@ trunc_err: db_update_event. The name of the event is inside "et". */ -static int +int db_create_event(THD *thd, Event_timed *et, my_bool create_if_not, uint *rows_affected) { @@ -954,7 +954,7 @@ err: 2) tbl is not closed at exit */ -static int +int db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, Event_timed **ett, TABLE *tbl, MEM_ROOT *root) { --- 1.27/sql/event.h 2006-03-16 13:14:32 +01:00 +++ 1.28/sql/event.h 2006-03-29 17:40:48 +02:00 @@ -19,6 +19,7 @@ #include "sp.h" #include "sp_head.h" +#include "event_manager.h" #define EVEX_OK SP_OK #define EVEX_KEY_NOT_FOUND SP_KEY_NOT_FOUND @@ -75,6 +76,8 @@ enum evex_table_field EVEX_FIELD_COUNT /* a cool trick to count the number of fields :) */ } ; +int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs); + class Event_timed { Event_timed(const Event_timed &); /* Prevent use of these */ @@ -286,6 +289,20 @@ public: delete sphead; sphead= 0; } + + bool + same_name(Event_timed *etn) + { + return (!sortcmp_lex_string(etn->name, name, system_charset_info) && + !sortcmp_lex_string(etn->dbname, dbname, system_charset_info) && + !sortcmp_lex_string(etn->definer, definer, system_charset_info) + ); + } + + int + kill_thread(); + + protected: bool change_security_context(THD *thd, Security_context *s_ctx, @@ -314,8 +331,6 @@ evex_open_event_table(THD *thd, enum thr int evex_show_create_event(THD *thd, sp_name *spn, LEX_STRING definer); -int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs); - int event_reconstruct_interval_expression(String *buf, interval_type interval, @@ -335,8 +350,6 @@ shutdown_events(); // auxiliary int event_timed_compare(Event_timed **a, Event_timed **b); - - /* CREATE TABLE event ( --- 1.41/sql/event_executor.cc 2006-03-01 04:21:57 +01:00 +++ 1.42/sql/event_executor.cc 2006-03-29 17:40:49 +02:00 @@ -17,6 +17,7 @@ #include "event_priv.h" #include "event.h" #include "sp.h" +#include "event_manager.h" #define WAIT_STATUS_READY 0 #define WAIT_STATUS_EMPTY_QUEUE 1 @@ -108,6 +109,7 @@ evex_init_mutexes() return; evex_mutexes_initted= TRUE; + pthread_mutex_init(&Event_scheduler_manager::LOCK_manager, MY_MUTEX_INIT_FAST); pthread_mutex_init(&LOCK_event_arrays, MY_MUTEX_INIT_FAST); pthread_mutex_init(&LOCK_workers_count, MY_MUTEX_INIT_FAST); pthread_mutex_init(&LOCK_evex_running, MY_MUTEX_INIT_FAST); @@ -237,10 +239,17 @@ shutdown_events() evex_mutexes_initted= FALSE; VOID(pthread_mutex_lock(&LOCK_evex_running)); VOID(pthread_mutex_unlock(&LOCK_evex_running)); - + + if (Event_scheduler_manager::destroy()) + { + DBUG_ASSERT(0); + sql_print_error("SCHEDULER: Trying to destroy while still running!"); + } pthread_mutex_destroy(&LOCK_event_arrays); pthread_mutex_destroy(&LOCK_workers_count); pthread_mutex_destroy(&LOCK_evex_running); + pthread_mutex_destroy(&Event_scheduler_manager::LOCK_manager); + } DBUG_VOID_RETURN; } --- New file --- +++ sql/event_manager.cc 06/03/29 17:40:51 /* Copyright (C) 2004-2005 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "event_priv.h" #include "event.h" /* Todo: 1. Empty queue at the end of ::run() */ #define CHECK_RUNNING(__ret_code) \ if (mode != MANAGER_RUNNING) \ { \ DBUG_PRINT("info", ("manager not running but %d. doing nothing", mode)); \ pthread_mutex_unlock(&LOCK_mutex); \ DBUG_RETURN(__ret_code); \ } Event_scheduler_manager *Event_scheduler_manager::ston= NULL; pthread_mutex_t Event_scheduler_manager::LOCK_manager; /* Inits an scheduler thread handler, both the main and a worker SYNOPSIS init_event_thread() thd - the THD of the thread. Has to be allocated by the caller. NOTES 1. The host of the thead is my_localhost 2. thd->net is initted with NULL - no communication. Returns 0 OK -1 Error */ static int init_event_thread(THD* thd) { DBUG_ENTER("init_event_thread"); thd->client_capabilities= 0; thd->security_ctx->master_access= 0; thd->security_ctx->db_access= 0; thd->security_ctx->host_or_ip= (char*)my_localhost; my_net_init(&thd->net, 0); thd->net.read_timeout = slave_net_timeout; thd->slave_thread= 0; thd->options|= OPTION_AUTO_IS_NULL; thd->client_capabilities= CLIENT_LOCAL_FILES; thd->real_id=pthread_self(); VOID(pthread_mutex_lock(&LOCK_thread_count)); thd->thread_id= thread_id++; threads.append(thd); thread_count++; thread_running++; VOID(pthread_mutex_unlock(&LOCK_thread_count)); if (init_thr_lock() || thd->store_globals()) { thd->cleanup(); delete thd; DBUG_RETURN(-1); } #if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) sigset_t set; VOID(sigemptyset(&set)); // Get mask in use VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); #endif thd->proc_info= "Initialized"; thd->version= refresh_version; thd->set_time(); DBUG_RETURN(0); } /* Inits the main manager thread and then calls Event_scheduler_manager::run() of arg. SYNOPSIS event_scheduler_run_proxy() arg void* ptr to Event_scheduler_manager NOTES 1. The host of the thead is my_localhost 2. thd->net is initted with NULL - no communication. Returns 0 OK -1 Error */ pthread_handler_t event_scheduler_run_proxy(void *arg) { THD *thd= NULL; // needs to be first for thread_stack Event_scheduler_manager *esm= (Event_scheduler_manager *) arg; DBUG_ENTER("event_scheduler_run_proxy"); my_thread_init(); /* note that contructor of THD uses DBUG_ ! */ if (!(thd = new THD)) { sql_print_error("SCHEDULER: Cannot create THD for the main thread."); goto finish; } thd->thread_stack = (char*)&thd; // remember where our stack is pthread_detach_this_thread(); if (init_event_thread(thd)) { sql_print_error("SCHEDULER: Cannot init main event thread."); goto finish; } /* Guarantees that we will see the thread in SHOW PROCESSLIST though its vio is NULL. */ thd->system_thread= 1; thd->security_ctx->user= my_strdup("event_scheduler", MYF(0)); sql_print_information("SCHEDULER: Main thread started"); esm->run(thd); /* NOTE: Don't touch `esm` after this point because we have notified the thread which shuts us down that we have finished cleaning. In this very moment a new manager thread could be started and a crash is not welcome. */ THD_CHECK_SENTRY(thd); finish: /* If we cannot create THD then don't decrease because we haven't touched thread_count and thread_running in init_event_thread() which was never called. In init_event_thread() thread_count and thread_running are always increased even in the case the method returns an error. */ if (thd) { pthread_mutex_lock(&LOCK_thread_count); thread_count--; thread_running--; pthread_mutex_unlock(&LOCK_thread_count); thd->proc_info = "Clearing"; DBUG_ASSERT(thd->net.buff != 0); net_end(&thd->net); THD_CHECK_SENTRY(thd); delete thd; } my_thread_end(); pthread_exit(0); DBUG_RETURN(0); // Can't return anything here } /* Adds an event to the executor queue Synopsis Event_scheduler_manager::add_event() et The event to add Returns FALSE OK TRUE Failure */ Event_scheduler_manager* Event_scheduler_manager::get_instance() { Event_scheduler_manager *tmp= NULL; DBUG_ENTER("Event_scheduler_manager::get_instance"); if (sizeof(my_time_t) != sizeof(time_t)) { sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." "The scheduler will not work correctly. Stopping."); DBUG_ASSERT(0); DBUG_RETURN(NULL); } pthread_mutex_lock(&LOCK_manager); if (ston) goto skip_init; /* Check it rarely (when creating instance) but do check it */ check_system_tables(); tmp= new Event_scheduler_manager; if (init_queue_ex(&tmp->queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, event_timed_compare_q, NULL, 30 /*auto_extent*/)) { sql_print_error("SCHEDULER: Insufficient memory to initialize " "executing queue."); goto err; } if (pthread_mutex_init(&tmp->LOCK_mutex, MY_MUTEX_INIT_FAST)) { sql_print_error("SCHEDULER: Unable to initalize lock LOCK_mutex"); goto err; } if (pthread_cond_init(&tmp->COND_new_work, NULL)) { sql_print_error("SCHEDULER: Unable to initialize COND_new_work"); goto err; } if (pthread_cond_init(&tmp->COND_started, NULL)) { sql_print_error("SCHEDULER: Unable to initialize COND_started"); goto err; } if (pthread_cond_init(&tmp->COND_shutdown, NULL)) { sql_print_error("SCHEDULER: Unable to initialize COND_shutdown"); goto err; } /* init memory root */ init_alloc_root(&tmp->manager_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); tmp->mode= MANAGER_INIT; ston= tmp; skip_init: pthread_mutex_unlock(&LOCK_manager); DBUG_RETURN(ston); err: pthread_mutex_unlock(&LOCK_manager); delete tmp; DBUG_RETURN(NULL); } /* Adds an event to the executor queue Synopsis Event_scheduler_manager::add_event() et The event to add Returns FALSE OK TRUE Failure */ bool Event_scheduler_manager::add_event(THD *thd, Event_timed *et, bool check_existance) { Event_timed *et_new; DBUG_ENTER("Event_scheduler_manager::add_event"); pthread_mutex_lock(&LOCK_mutex); CHECK_RUNNING(false); if (check_existance) if (find_event(et, false)) { pthread_mutex_unlock(&LOCK_mutex); DBUG_RETURN(true); } /* We need to load the event on manager_root */ if ((et_new= load_and_compile_event(thd, et))) { queue_insert_safe(&queue, (byte *) et_new); pthread_cond_signal(&COND_new_work); } pthread_mutex_unlock(&LOCK_mutex); DBUG_RETURN(!et_new); } /* Drops an event from the executor queue Synopsis Event_scheduler_manager::drop_event() etn The event to drop mode Wait the event or kill&drop Returns FALSE OK TRUE Failure */ bool Event_scheduler_manager::drop_event(THD *thd, Event_timed *etn, enum event_drop_mode drop_mode) { Event_timed *et; DBUG_ENTER("Event_scheduler_manager::drop_event"); pthread_mutex_lock(&LOCK_mutex); CHECK_RUNNING(false); if ((et= find_event(etn, true))) et->kill_thread(); else DBUG_PRINT("info", ("no such event found")); pthread_mutex_unlock(&LOCK_mutex); DBUG_RETURN(false); } /* Replaces an event in the executor queue Synopsis Event_scheduler_manager::replace_event() et The event to replace(add) into the queue mode Wait the event or kill&drop Returns FALSE OK TRUE Failure */ bool Event_scheduler_manager::replace_event(THD *thd, Event_timed *et, enum event_drop_mode replace_mode) { Event_timed *et_old, *et_new= NULL; DBUG_ENTER("Event_scheduler_manager::replace_event"); pthread_mutex_lock(&LOCK_mutex); CHECK_RUNNING(false); if ((et_old= find_event(et, true))) et_old->kill_thread(); else DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED", et->dbname.str, et->name.str)); /* We need to load the event on manager_root */ if ((et_new= load_and_compile_event(thd, et))) { queue_insert_safe(&queue, (byte *) et_new); pthread_cond_signal(&COND_new_work); } pthread_mutex_unlock(&LOCK_mutex); DBUG_RETURN(!et_new); } /* Searches for an event in the executor queue Synopsis Event_scheduler_manager::find_event() etn The event to find remove_from_q If found whether to remove from the Q Returns NULL Not found otherwise Address */ Event_timed * Event_scheduler_manager::find_event(Event_timed *etn, bool remove_from_q) { uint i; DBUG_ENTER("Event_scheduler_manager::find_event"); for (i= 0; i < queue.elements; ++i) { Event_timed *et= (Event_timed *) queue_element(&queue, i); DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", etn->dbname.str, etn->name.str, et->dbname.str, et->name.str)); if (etn->same_name(et)) { if (remove_from_q) queue_remove(&queue, i); DBUG_RETURN(et); } } DBUG_RETURN(NULL); } extern pthread_attr_t connection_attrib; /* Starts the event queue manager Synopsis Event_scheduler_manager::start() Returns EVENT_OP_OK OK EVENT_OP_CANTFORK Cannot create a new thread EVENT_OP_CANTSTART ::run() had problem booting EVENT_OP_ALREADY_RUNNING Manager already running */ enum Event_scheduler_manager::event_manager_op_code Event_scheduler_manager::start() { enum event_manager_op_code ret; pthread_t th; DBUG_ENTER("Event_scheduler_manager::start"); pthread_mutex_lock(&LOCK_mutex); /* If already working or starting don't make another attempt */ if (mode > MANAGER_INIT) { DBUG_PRINT("info", ("manager is already running or starting")); ret= EVENT_OP_ALREADY_RUNNING; goto finish; } /* Now if another thread calls start it will bail-out because the branch above will be executed. Thus no two or more child threads will be forked. We the child thread cannot start for some reason then `mode` is set to MANAGER_CANTSTART and COND_started is also signaled. In this case we set `mode` back to MANAGER_INIT so another attempt to start the manager can be made. */ mode= MANAGER_STARTING; /* Fork */ if (pthread_create(&th, &connection_attrib, event_scheduler_run_proxy, (void*)this)) { DBUG_PRINT("error", ("cannot create a new thread")); mode= MANAGER_INIT; ret= EVENT_OP_CANTFORK; goto finish; } /* Wait till the child thread has booted (w/ or wo success) */ while (mode != MANAGER_RUNNING && mode != MANAGER_CANTSTART) pthread_cond_wait(&COND_started, &LOCK_mutex); /* If we cannot start for some reason then don't prohibit further attempts. Set back to MANAGER_INIT. */ if (mode == MANAGER_CANTSTART) { mode= MANAGER_INIT; ret= EVENT_OP_CANTSTART; goto finish; } ret= EVENT_OP_OK; finish: pthread_mutex_unlock(&LOCK_mutex); DBUG_RETURN(ret); } /* The internal loop of the event queue manager Synopsis Event_scheduler_manager::run() Returns FALSE OK TRUE Failure */ bool Event_scheduler_manager::run(THD *thd) { int ret; struct timespec abstime; DBUG_ENTER("Event_scheduler_manager::run"); /* Load events from disk. `mode` is currently MANAGER_STARTING. Hence, there is no race between loading and add_event() because the latter won't touch the queue until the mode is not MANAGER_RUNNING, which is set a bit later in this function. */ ret= load_events_from_db(thd); pthread_mutex_lock(&LOCK_mutex); if (!ret) { thread_id= thd->thread_id; sql_print_information("SCHEDULER: Thread_id %d", thread_id); mode= MANAGER_RUNNING; } else mode= MANAGER_CANTSTART; pthread_cond_signal(&COND_started); pthread_mutex_unlock(&LOCK_mutex); if (ret) DBUG_RETURN(true); abstime.tv_nsec= 0; while (!thd->killed) { TIME time_next; int t2sleep; Event_timed *et; my_bool tmp; pthread_mutex_lock(&LOCK_mutex); while (!queue.elements) pthread_cond_wait(&COND_new_work, &LOCK_mutex); et= (Event_timed *)queue_top(&queue); DBUG_PRINT("evex main thread",("computing time to sleep till next exec")); /* This timestamp is in UTC */ abstime.tv_sec= (time_t)TIME_to_timestamp(thd, &et->execute_at, &tmp); /* Convert the UTC timestamp to the local timezone*/ my_tz_SYSTEM->gmt_sec_to_TIME(&time_next, abstime.tv_sec); abstime.tv_sec= (time_t) TIME_to_timestamp(thd, &time_next, &tmp); thd->end_time(); if (abstime.tv_sec > thd->query_start()) { DBUG_PRINT("info", ("Going to sleep")); pthread_cond_timedwait(&COND_new_work, &LOCK_mutex, &abstime); } else { /* Execute the event */ } pthread_mutex_unlock(&LOCK_mutex); } sql_print_information("SCHEDULER: Emptying the queue."); /* empty the queue */ /* free mamager_root memory but don't destroy the root */ /* We notify the waiting thread which shutdowns us that we have cleaned. There are few more instructions to be executed in this pthread but they don't affect manager structures thus it's safe to signal already at this point. */ pthread_mutex_lock(&LOCK_mutex); /* Don't mode set to MANAGER_STOPPED because ::start() checks (mode > MANAGER_INIT) to decide whether it can fork a new thread or return. MANAGER_STOPPED is used by the dtor of Event_scheduler_manager, thus preventing a fork of a new thread while the manager object is being destroyed. */ mode= MANAGER_INIT; sql_print_information("SCHEDULER: Stopped."); pthread_cond_signal(&COND_shutdown); pthread_mutex_unlock(&LOCK_mutex); DBUG_RETURN(false); } /* Shutdowns the event queue manager Synopsis Event_scheduler_manager::shutdown() mode Wait the events or kill them immediately Returns EVENT_OP_OK OK EVENT_OP_CANT_KILL Error during stopping of manager thread EVENT_OP_NOT_RUNNING Manager not working */ enum Event_scheduler_manager::event_manager_op_code Event_scheduler_manager::shutdown(enum event_drop_mode stop_mode) { int ret; THD *thd= current_thd; DBUG_ENTER("Event_scheduler_manager::shutdown"); pthread_mutex_lock(&LOCK_mutex); CHECK_RUNNING(EVENT_OP_NOT_RUNNING); mode= MANAGER_SHUTDOWN; if ((ret= kill_one_thread(thd, thread_id, false))) { DBUG_PRINT("error", ("Error while killing thread %d errcode=%d", thread_id, ret)); pthread_mutex_unlock(&LOCK_mutex); DBUG_RETURN(EVENT_OP_CANT_KILL); } /* Guarantee we don't catch spurious signals */ while (mode != MANAGER_INIT) pthread_cond_wait(&COND_shutdown, &LOCK_mutex); DBUG_PRINT("info", ("Manager thread has cleaned up. Set mode to INIT")); pthread_mutex_unlock(&LOCK_mutex); DBUG_RETURN(EVENT_OP_OK); } /* Returns the number of elements in the queue Synopsis Event_scheduler_manager::events_count() Returns -1 Manager not working >=0 Number of Event_timed objects in the queue */ int Event_scheduler_manager::events_count() { int n; DBUG_ENTER("Event_scheduler_manager::events_count"); pthread_mutex_lock(&LOCK_manager); n= (mode == MANAGER_RUNNING)? queue.elements:-1; pthread_mutex_unlock(&LOCK_manager); DBUG_RETURN(n); } /* Looks for a named event in mysql.event and then loads it from the table, compiles and inserts it into the cache. SYNOPSIS Event_scheduler_manager::load_and_compile_event() thd THD etn The name of the event to load and compile on manager's root RETURN VALUE NULL Error during compile otherwise Address */ Event_timed * Event_scheduler_manager::load_and_compile_event(THD *thd, Event_timed *etn) { int ret= 0; MEM_ROOT *tmp_mem_root; Event_timed *et_new; Open_tables_state backup; DBUG_ENTER("Event_scheduler_manager::load_and_compile_event"); DBUG_PRINT("enter", ("name: %s", etn->name.str)); thd->reset_n_backup_open_tables_state(&backup); /* No need to use my_error() here because db_find_event() has done it */ { sp_name spn(etn->dbname, etn->name); ret= db_find_event(thd, &spn, &etn->definer, &et_new, NULL, &manager_root); } thd->restore_backup_open_tables_state(&backup); if (ret) DBUG_RETURN(NULL); tmp_mem_root= thd->mem_root; thd->mem_root= &manager_root; /* Allocate on manager_root. If you call without manager_root then sphead will not be cleared! */ if (!(ret= et_new->compile(thd, &manager_root))) et_new->compute_next_execution_time(); thd->mem_root= tmp_mem_root; if (ret) { delete et_new; et_new= NULL; } DBUG_RETURN(et_new); } /* Loads all ENABLED events from mysql.event into the prioritized queue. Called during scheduler main thread initialization. Compiles the events. Creates Event_timed instances for every ENABLED event from mysql.event. SYNOPSIS Event_scheduler_manager::load_events_from_db() thd - Thread context. Used for memory allocation in some cases. RETURNS 0 OK !0 Error NOTES Reports the error to the console */ int Event_scheduler_manager::load_events_from_db(THD *thd) { TABLE *table; READ_RECORD read_record_info; int ret= -1; uint count= 0; DBUG_ENTER("evex_load_events_from_db"); pthread_mutex_lock(&LOCK_mutex); if (mode > MANAGER_STARTING) { DBUG_ASSERT(0); sql_print_error("SCHEDULER: Trying to load events while already running."); pthread_mutex_unlock(&LOCK_mutex); DBUG_RETURN(EVEX_GENERAL_ERROR); } pthread_mutex_unlock(&LOCK_mutex); if ((ret= evex_open_event_table(thd, TL_READ, &table))) { sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open."); DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); } init_read_record(&read_record_info, thd, table ,NULL,1,0); while (!(read_record_info.read_record(&read_record_info))) { Event_timed *et; if (!(et= new Event_timed)) { DBUG_PRINT("load_events_from_db", ("Out of memory")); ret= -1; goto end; } DBUG_PRINT("load_events_from_db", ("Loading event from row.")); if ((ret= et->load_from_row(&manager_root, table))) { sql_print_error("SCHEDULER: Error while loading from mysql.event. " "Table probably corrupted"); goto end; } if (et->status != MYSQL_EVENT_ENABLED) { DBUG_PRINT("load_events_from_db",("%s is disabled",et->name.str)); delete et; continue; } DBUG_PRINT("load_events_from_db", ("Event %s loaded from row. Time to compile", et->name.str)); switch (ret= et->compile(thd, &manager_root)) { case EVEX_MICROSECOND_UNSUP: sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " "supported but found in mysql.event"); goto end; case EVEX_COMPILE_ERROR: sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", et->dbname.str, et->name.str); goto end; default: break; } /* let's find when to be executed */ if (et->compute_next_execution_time()) { sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." " Skipping", et->dbname.str, et->name.str); continue; } DBUG_PRINT("load_events_from_db", ("Adding to the exec list.")); queue_insert_safe(&queue, (byte *) et); DBUG_PRINT("load_events_from_db", ("%p %*s", et, et->name.length,et->name.str)); count++; } ret= 0; end: end_read_record(&read_record_info); /* Force close to free memory */ thd->version--; close_thread_tables(thd); if (!ret) sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); DBUG_RETURN(ret); } /* Opens mysql.db and mysql.user and checks whether: 1. mysql.db has column Event_priv at column 20 (0 based); 2. mysql.user has column Event_priv at column 29 (0 based); Synopsis Event_scheduler_manager::evex_check_system_tables() */ void Event_scheduler_manager::check_system_tables() { THD *thd= current_thd; TABLE_LIST tables; bool not_used; Open_tables_state backup; /* thd is 0x0 during boot of the server. Later it's !=0x0 */ if (!thd) return; thd->reset_n_backup_open_tables_state(&backup); bzero((char*) &tables, sizeof(tables)); tables.db= (char*) "mysql"; tables.table_name= tables.alias= (char*) "db"; tables.lock_type= TL_READ; if (simple_open_n_lock_tables(thd, &tables)) sql_print_error("Cannot open mysql.db"); else { table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, mysql_db_table_fields, &mysql_db_table_last_check,ER_CANNOT_LOAD_FROM_TABLE); close_thread_tables(thd); } bzero((char*) &tables, sizeof(tables)); tables.db= (char*) "mysql"; tables.table_name= tables.alias= (char*) "user"; tables.lock_type= TL_READ; if (simple_open_n_lock_tables(thd, &tables)) sql_print_error("Cannot open mysql.db"); else { if (tables.table->s->fields < 29 || strncmp(tables.table->field[29]->field_name, STRING_WITH_LEN("Event_priv"))) sql_print_error("mysql.user has no `Event_priv` column at position 29"); close_thread_tables(thd); } thd->restore_backup_open_tables_state(&backup); } #ifndef DBUG_OFF #define ERROR_W_MESSAGE(cond, msg) \ { \ sql_print_information("Testing %s", msg); \ DBUG_PRINT("info",("Testing %s", msg)); \ protocol->prepare_for_resend(); \ protocol->store(msg, strlen(msg), system_charset_info); \ if (!(cond)){ \ protocol->store((char*)STRING_WITH_LEN("ERROR"), system_charset_info); \ ret= protocol->write(); \ goto err; \ } else {\ protocol->store((char*)STRING_WITH_LEN("OK"), system_charset_info); \ ret= protocol->write(); \ if (ret) \ goto err; \ } \ } bool Event_scheduler_manager::run_tests(THD *thd) { Protocol *protocol= thd->protocol; Event_scheduler_manager *mgr; List field_list; int ret; DBUG_ENTER("evex_manager_tester"); field_list.push_back(new Item_empty_string("Test", 100)); field_list.push_back(new Item_empty_string("Result", 5)); if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(true); Event_timed et; et.dbname.str= (char *)"test"; et.dbname.length= strlen("test"); et.name.str= (char *)"event_name"; et.name.length= strlen("event_name"); et.definer.str= (char *)"definer@host"; et.definer.length= strlen("definer@host"); et.body.str= (char *) "select 1 from dual"; et.body.length= strlen("select 1 from dual"); et.expression= 1; et.interval= INTERVAL_MINUTE; Event_timed et2; et2.dbname.str= (char *)"test"; et2.dbname.length= strlen("test"); et2.name.str= (char *)"event_name2"; et2.name.length= strlen("event_name2"); et2.definer.str= (char *)"definer@host"; et2.definer.length= strlen("definer@host"); et2.body.str= (char *) "select 2 from dual"; et2.body.length= strlen("select 2 from dual"); et2.expression= 2; et2.interval= INTERVAL_HOUR; uint tmp; /* Force new manager instantiation */ ERROR_W_MESSAGE(!Event_scheduler_manager::destroy(), "Destroy old manager"); ERROR_W_MESSAGE(!Event_scheduler_manager::ston, "Old manager"); mgr= Event_scheduler_manager::get_instance(); ERROR_W_MESSAGE(mgr, "Create new manager"); ERROR_W_MESSAGE((mgr->mode == Event_scheduler_manager::MANAGER_INIT), "Test manager mode "); ERROR_W_MESSAGE(mgr->events_count() == -1, "Test count"); ERROR_W_MESSAGE(!db_create_event(thd, &et, false, &tmp), "Create event on disk"); ERROR_W_MESSAGE(!mgr->start(), "Start manager"); mgr->add_event(thd, &et, false); ERROR_W_MESSAGE(mgr->events_count() == 2, "Test count wo check"); mgr->add_event(thd, &et, true); ERROR_W_MESSAGE(mgr->events_count() == 2, "Test count with check"); mgr->add_event(thd, &et, false); ERROR_W_MESSAGE(mgr->events_count() == 3, "Test count wo check"); mgr->add_event(thd, &et2, true); ERROR_W_MESSAGE(mgr->events_count() == 3, "Test count after add of et2"); ERROR_W_MESSAGE(!db_create_event(thd, &et2, false, &tmp), "Create event et2 on disk"); mgr->add_event(thd, &et2, true); ERROR_W_MESSAGE(mgr->events_count() == 4, "Test count after add of et2 (again)"); mgr->drop_event(thd, &et, Event_scheduler_manager::EVENT_STOP_IMMED); ERROR_W_MESSAGE(mgr->events_count() == 3, "Test delete 1"); mgr->drop_event(thd, &et, Event_scheduler_manager::EVENT_STOP_IMMED); ERROR_W_MESSAGE(mgr->events_count() == 2, "Test delete 2"); mgr->drop_event(thd, &et, Event_scheduler_manager::EVENT_STOP_IMMED); ERROR_W_MESSAGE(mgr->events_count() == 1, "Test delete 3"); et2.body.str= (char *) "select 212 from dual"; et2.body.length= strlen("select 212 from dual"); mgr->replace_event(thd, &et2, Event_scheduler_manager::EVENT_STOP_IMMED); ERROR_W_MESSAGE(mgr->events_count() == 1, "Test delete 4"); /* Here we will get an empty line because kill_one_thread() which is used by ::shutdown() calls send_eof(). */ ERROR_W_MESSAGE(!mgr->shutdown(EVENT_STOP_IMMED), "Shutdown the manager"); ERROR_W_MESSAGE(!mgr->start(), "Start manager again and sleep 2s"); ERROR_W_MESSAGE(mgr->start()== Event_scheduler_manager::EVENT_OP_ALREADY_RUNNING, "Try to start again"); /* now there should be 2 events */ sleep(15); /* Here we will get an empty line because kill_one_thread() which is used by ::shutdown() calls send_eof(). */ ERROR_W_MESSAGE(!mgr->shutdown(EVENT_STOP_IMMED), "Shutdown manager"); ERROR_W_MESSAGE(mgr->shutdown(EVENT_STOP_IMMED)== Event_scheduler_manager::EVENT_OP_NOT_RUNNING, "Shutdown manager again for checking"); ERROR_W_MESSAGE(!Event_scheduler_manager::destroy(), "Destroy new manager"); ERROR_W_MESSAGE(!Event_scheduler_manager::ston, "Singleton"); ERROR_W_MESSAGE(!et.drop(thd), "et.drop()"); ERROR_W_MESSAGE(!et2.drop(thd), "et2.drop()"); ERROR_W_MESSAGE(TRUE, "All Tests"); send_eof(thd); DBUG_RETURN(false); err: mgr->shutdown(EVENT_STOP_IMMED); Event_scheduler_manager::destroy(); send_eof(thd); DBUG_RETURN(true); } #endif --- New file --- +++ sql/event_manager.h 06/03/29 17:40:52 #ifndef _EVENT_MANAGER_H_ #define _EVENT_MANAGER_H_ class THD; class Event_scheduler_manager { public: static pthread_mutex_t LOCK_manager; enum event_drop_mode { EVENT_STOP_IMMED = 1, EVENT_STOP_WAIT =2 }; enum event_manager_mode { MANAGER_INIT= 0, MANAGER_STARTING, MANAGER_CANTSTART, MANAGER_RUNNING, MANAGER_SHUTDOWN, MANAGER_STOPPED, }; enum event_manager_op_code { EVENT_OP_OK = 0, EVENT_OP_CANTFORK, EVENT_OP_CANTSTART, EVENT_OP_ALREADY_RUNNING, EVENT_OP_NOT_RUNNING, EVENT_OP_CANT_KILL }; static Event_scheduler_manager* get_instance(); bool add_event(THD *thd, Event_timed *et, bool check_existance); bool drop_event(THD *thd, Event_timed *etn, enum event_drop_mode drop_mode); bool replace_event(THD *thd, Event_timed *et, enum event_drop_mode replace_mode); /* LOCKING PROTOCOL: Does not do any locking. You are responsible to lock */ Event_timed * find_event(Event_timed *etn, bool remove_from_q); bool trigger_event(THD *thd, Event_timed *etn) { DBUG_ASSERT(0);} int events_count(); enum event_manager_op_code start(); bool run(THD *thd); enum event_manager_op_code shutdown(enum event_drop_mode stop_mode); /* This method is pretty dangerous and we use it only for testing purposes. */ static bool destroy() { pthread_mutex_lock(&LOCK_manager); if (ston && ston->mode == MANAGER_RUNNING) { /* the master thread should not be running */ DBUG_ASSERT(0); return true; } delete ston; ston= NULL; pthread_mutex_unlock(&LOCK_manager); return false; } #ifndef DBUG_OFF static bool run_tests(THD *thd); #endif protected: /* helper functions */ static void check_system_tables(); Event_timed * load_and_compile_event(THD *thd, Event_timed *etn); int load_events_from_db(THD *thd); private: /* Prevent use of these */ Event_scheduler_manager(const Event_scheduler_manager &); void operator=(Event_scheduler_manager &); Event_scheduler_manager():thread_id(0) {} /* LOCK PROCOTOL: Expects that this method is synchronized */ ~Event_scheduler_manager() { /* ToDo: Wait if there are elements on the queue */ pthread_mutex_lock(&LOCK_mutex); ston->mode= MANAGER_SHUTDOWN; pthread_mutex_unlock(&LOCK_mutex); delete_queue(&queue); pthread_cond_destroy(&COND_new_work); pthread_cond_destroy(&COND_started); mode= MANAGER_STOPPED; } static Event_scheduler_manager *ston; enum event_manager_mode mode; pthread_mutex_t LOCK_mutex; ulonglong thread_id; pthread_cond_t COND_new_work; pthread_cond_t COND_started; pthread_cond_t COND_shutdown; MEM_ROOT manager_root; QUEUE queue; }; #endif /* _EVENT_MANAGER_H_ */ --- 1.20/sql/event_priv.h 2006-02-28 18:33:25 +01:00 +++ 1.21/sql/event_priv.h 2006-03-29 17:40:49 +02:00 @@ -46,7 +46,13 @@ event_timed_compare_q(void *vptr, byte* int db_drop_event(THD *thd, Event_timed *et, bool drop_if_exists, uint *rows_affected); +int +db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, Event_timed **ett, + TABLE *tbl, MEM_ROOT *root); +int +db_create_event(THD *thd, Event_timed *et, my_bool create_if_not, + uint *rows_affected); #define EXEC_QUEUE_QUEUE_NAME executing_queue #define EXEC_QUEUE_DARR_NAME evex_executing_queue --- 1.47/sql/event_timed.cc 2006-03-17 09:36:32 +01:00 +++ 1.48/sql/event_timed.cc 2006-03-29 17:40:49 +02:00 @@ -1546,3 +1546,20 @@ Event_timed::spawn_unlock(THD *thd) VOID(pthread_mutex_unlock(&this->LOCK_running)); return ret; } + + +/* + Kills a running event + Synopsis + Event_timed::kill_thread() + + Returns + 0 OK + !0 Error +*/ + +int +Event_timed::kill_thread() +{ + return false; +}