Below is the list of changes that have just been committed into a local
5.1 repository of andrey. When andrey does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1993 05/12/12 21:19:19 andrey@lmy004. +7 -0
WL#1034 update
QUEUE implementation working now. this should be ready more or less
for testing once the debug output is being cleaned and some things
around DYNAMIC_ARRAY are cleaned
- fix handling in case of errors that lead to crashes, now no more crashes
in case of table corruption and such.
sql/event_timed.cc
1.9 05/12/12 21:19:12 andrey@lmy004. +3 -4
allocate one more byte and zeroterminate, really
sql/event_priv.h
1.7 05/12/12 21:19:12 andrey@lmy004. +71 -9
- reorder a bit
- add macroses and functions for queue manipulation which stay on top
of QUEUE (partly implemented for DYNAMIC_ARRAY but will be cleared to be
only for QUEUE).
sql/event_executor.cc
1.7 05/12/12 21:19:12 andrey@lmy004. +137 -89
reorder
sql/event.h
1.7 05/12/12 21:19:12 andrey@lmy004. +2 -4
reorder
sql/event.cc
1.10 05/12/12 21:19:12 andrey@lmy004. +117 -10
- move mysql_priv.h inclusion to event_priv.h
- use a priority queue instead of DYNAMIC_ARRAY which is sorted
mysys/queues.c
1.17 05/12/12 21:19:12 andrey@lmy004. +66 -1
add init_queue_ex() implementation
add queue_insert_safe() implementation
include/queues.h
1.13 05/12/12 21:19:12 andrey@lmy004. +6 -0
introduce a safe version of queue_insert that will extend the queue if
necessary. the auto_extent is passed to the _ex version of init_queue()
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: andrey
# Host: lmy004.
# Root: /work/mysql-5.1-tt-copy-works
--- 1.12/include/queues.h 2005-07-18 13:30:08 +02:00
+++ 1.13/include/queues.h 2005-12-12 21:19:12 +01:00
@@ -35,6 +35,7 @@
uint offset_to_key; /* compare is done on element+offset */
int max_at_top; /* Set if queue_top gives max */
int (*compare)(void *, byte *,byte *);
+ uint auto_extent;
} QUEUE;
#define queue_top(queue) ((queue)->root[1])
@@ -49,14 +50,19 @@
int init_queue(QUEUE *queue,uint max_elements,uint offset_to_key,
pbool max_at_top, queue_compare compare,
void *first_cmp_arg);
+int init_queue_ex(QUEUE *queue,uint max_elements,uint offset_to_key,
+ pbool max_at_top, queue_compare compare,
+ void *first_cmp_arg, uint auto_extent);
int reinit_queue(QUEUE *queue,uint max_elements,uint offset_to_key,
pbool max_at_top, queue_compare compare,
void *first_cmp_arg);
int resize_queue(QUEUE *queue, uint max_elements);
void delete_queue(QUEUE *queue);
void queue_insert(QUEUE *queue,byte *element);
+int queue_insert_safe(QUEUE *queue, byte *element);
byte *queue_remove(QUEUE *queue,uint idx);
#define queue_remove_all(queue) { (queue)->elements= 0; }
+#define queue_is_full(queue) (queue->elements == queue->max_elements)
void _downheap(QUEUE *queue,uint idx);
void queue_fix(QUEUE *queue);
#define is_queue_inited(queue) ((queue)->root != 0)
--- 1.16/mysys/queues.c 2005-07-18 13:30:08 +02:00
+++ 1.17/mysys/queues.c 2005-12-12 21:19:12 +01:00
@@ -19,7 +19,7 @@
Implemention of queues from "Algoritms in C" by Robert Sedgewick.
An optimisation of _downheap suggested in Exercise 7.51 in "Data
Structures & Algorithms in C++" by Mark Allen Weiss, Second Edition
+ was implemented by Mikael Ronstrom 2005. Also the O(N) algorithm
of queue_fix was implemented.
*/
@@ -67,6 +67,46 @@
}
+
+/*
+ Init queue, uses init_queue internally for init work but also accepts
+ auto_extent as parameter
+
+ SYNOPSIS
+ init_queue_ex()
+ queue Queue to initialise
+ max_elements Max elements that will be put in queue
+ offset_to_key Offset to key in element stored in queue
+ Used when sending pointers to compare function
+ max_at_top Set to 1 if you want biggest element on top.
+ compare Compare function for elements, takes 3 arguments.
+ first_cmp_arg First argument to compare function
+ auto_extent When the queue is full and there is insert operation
+ extend the queue.
+
+ NOTES
+ Will allocate max_element pointers for queue array
+
+ RETURN
+ 0 ok
+ 1 Could not allocate memory
+*/
+
+int init_queue_ex(QUEUE *queue, uint max_elements, uint offset_to_key,
+ pbool max_at_top, int (*compare) (void *, byte *, byte *),
+ void *first_cmp_arg, uint auto_extent)
+{
+ int ret;
+ DBUG_ENTER("init_queue_ex");
+
+ if ((ret= init_queue(queue, max_elements, offset_to_key, max_at_top, compare,
+ first_cmp_arg)))
+ DBUG_RETURN(ret);
+
+ queue->auto_extent= auto_extent;
+ DBUG_RETURN(0);
+}
+
/*
Reinitialize queue for other usage
@@ -191,6 +231,31 @@
queue->root[idx]=element;
}
}
+
+/*
+ Does safe insert. If no more space left on the queue resize it.
+ Return codes:
+ 0 - OK
+ 1 - Cannot allocate more memory
+ 2 - auto_extend is 0, the operation would
+
+*/
+
+int queue_insert_safe(register QUEUE *queue, byte *element)
+{
+
+ if (queue->elements == queue->max_elements)
+ {
+ if (!queue->auto_extent)
+ return 2;
+ else if (resize_queue(queue, queue->max_elements + queue->auto_extent))
+ return 1;
+ }
+
+ queue_insert(queue, element);
+ return 0;
+}
+
/* Remove item from queue */
/* Returns pointer to removed element */
--- 1.9/sql/event.cc 2005-12-08 20:37:48 +01:00
+++ 1.10/sql/event.cc 2005-12-12 21:19:12 +01:00
@@ -14,16 +14,14 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#include "mysql_priv.h"
-#include "event.h"
#include "event_priv.h"
+#include "event.h"
#include "sp.h"
/*
TODO list :
- The default value of created/modified should not be 0000-00-00 because of
STRICT mode restricions.
- - Remove m_ prefixes of member variables.
- Use timestamps instead of datetime.
@@ -53,9 +51,17 @@
- Consider using conditional variable when doing shutdown instead of
waiting till all worker threads end.
- Make event_timed::get_show_create_event() work
+
- Add function documentation whenever needed.
+
- Add logging to file
+ - Move comparison code to class event_timed
+
+ - Overload event_timed::new to put the event directly in the DYNAMIC_ARRAY.
+ This will skip copy operation as well as will simplify the code which is
+ now aware of events_array DYNAMIC_ARRAY
+
Warning:
- For now parallel execution is not possible because the same sp_head cannot be
executed few times!!! There is still no lock attached to particular event.
@@ -67,19 +73,60 @@
bool mysql_event_table_exists= 1;
DYNAMIC_ARRAY events_array;
-DYNAMIC_ARRAY evex_executing_queue;
+DYNAMIC_ARRAY EXEC_QUEUE_DARR_NAME;
+QUEUE EXEC_QUEUE_QUEUE_NAME;
MEM_ROOT evex_mem_root;
-
//extern volatile uint thread_running;
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
//////////////// Static functions follow ///////////////////////////
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
+void
+evex_queue_init(EVEX_QUEUE_TYPE *queue)
+{
+#ifndef EVEX_USE_QUEUE
+ VOID(my_init_dynamic_array(queue, sizeof(event_timed *), 50, 100));
+#else
+ if (init_queue_ex(queue, 100 /*num_el*/, 0 /*offset*/,
+ 0 /*smallest_on_top*/, event_timed_compare_q, NULL,
+ 100 /*auto_extent*/))
+ sql_print_error("Insufficient memory to initialize executing queue.");
+#endif
+}
+int
+evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element)
+{
+#ifndef EVEX_USE_QUEUE
+ VOID(push_dynamic(queue, element));
+ return 0;
+#else
+ return queue_insert_safe(queue, element);
+#endif
+}
+
+void
+evex_queue_top_updated(EVEX_QUEUE_TYPE *queue)
+{
+#ifdef EVEX_USE_QUEUE
+ queue_replaced(queue);
+#endif
+}
+
+void
+evex_queue_sort(EVEX_QUEUE_TYPE *queue)
+{
+#ifndef EVEX_USE_QUEUE
+ qsort((gptr) dynamic_element(queue, 0, event_timed**),
+ queue->elements,
+ sizeof(event_timed **),
+ (qsort_cmp) event_timed_compare);
+#endif
+}
/* NOTE Andrey: Document better
Compares two TIME structures.
@@ -98,7 +145,7 @@
}
-inline int
+int
my_time_compare(TIME *a, TIME *b)
{
/*
@@ -107,6 +154,7 @@
*/
DBUG_ENTER("my_time_compare");
+
if (a->year > b->year)
DBUG_RETURN(1);
@@ -143,19 +191,53 @@
if (a->second < b->second)
DBUG_RETURN(-1);
- /*!! second_part is not compared !*/
+
+ if (a->second_part > b->second_part)
+ DBUG_RETURN(1);
+
+ if (a->second_part < b->second_part)
+ DBUG_RETURN(-1);
+
DBUG_RETURN(0);
}
+int
+evex_time_diff(TIME *a, TIME *b)
+{
+ my_bool in_gap;
+ DBUG_ENTER("my_time_diff");
+
+ return sec_since_epoch_TIME(a) - sec_since_epoch_TIME(b);
+}
+
inline int
event_timed_compare(event_timed **a, event_timed **b)
{
- return my_time_compare(&(*a)->execute_at, &(*b)->execute_at);
+ my_ulonglong a_t, b_t;
+ a_t= TIME_to_ulonglong_datetime(&(*a)->execute_at)*100L +
+ (*a)->execute_at.second_part;
+ b_t= TIME_to_ulonglong_datetime(&(*b)->execute_at)*100L +
+ (*b)->execute_at.second_part;
+
+ if (a_t > b_t)
+ return 1;
+ else if (a_t < b_t)
+ return -1;
+ else
+ return 0;
+
+// return my_time_compare(&(*a)->execute_at, &(*b)->execute_at);
}
+int
+event_timed_compare_q(void *vptr, byte* a, byte *b)
+{
+ return event_timed_compare((event_timed **)&a, (event_timed **)&b);
+}
+
/*
Open mysql.event table for read
@@ -660,7 +742,10 @@
VOID(push_dynamic(&events_array,(gptr) ett));
ett_copy= dynamic_element(&events_array, events_array.elements - 1,
event_timed*);
+/**
VOID(push_dynamic(&evex_executing_queue, (gptr) &ett_copy));
+**/
+ evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) ett_copy);
/*
There is a copy in the array which we don't need. sphead won't be
@@ -674,11 +759,14 @@
qsort of events_array.elements (the current number of elements).
We know that the elements are stored in a contiguous block w/o holes.
*/
+/**
qsort((gptr) dynamic_element(&evex_executing_queue, 0, event_timed**),
evex_executing_queue.elements,
sizeof(event_timed **),
(qsort_cmp) event_timed_compare);
-
+**/
+ evex_queue_sort(&EVEX_EQ_NAME);
+
if (use_lock)
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
@@ -703,7 +791,7 @@
if (use_lock)
VOID(pthread_mutex_lock(&LOCK_event_arrays));
-
+/**
for (i= 0; i < evex_executing_queue.elements; ++i)
{
event_timed *et= *dynamic_element(&evex_executing_queue, i, event_timed**);
@@ -729,6 +817,25 @@
et->free_sp();
delete_dynamic_element(&events_array, idx);
delete_dynamic_element(&evex_executing_queue, i);
+ // ok, we have cleaned
+ goto done;
+ }
+ }
+**/
+ for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i)
+ {
+ event_timed *et= *evex_queue_element(&EVEX_EQ_NAME, i, event_timed**);
+ DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?",db->str,name->str, et->dbname.str,
+ et->name.str));
+ if (!sortcmp_lex_string(*name, et->name, system_charset_info) &&
+ !sortcmp_lex_string(*db, et->dbname, system_charset_info))
+ {
+ int idx= get_index_dynamic(&events_array, (gptr) et);
+ //we are lucky the event is in the executing queue, no need of second pass
+ //destruct first and then remove. the destructor will delete sp_head
+ et->free_sp();
+ evex_queue_delete_element(&EVEX_EQ_NAME, idx);
+ evex_queue_delete_element(&EVEX_EQ_NAME, i);
// ok, we have cleaned
goto done;
}
--- 1.6/sql/event.h 2005-12-08 15:34:03 +01:00
+++ 1.7/sql/event.h 2005-12-12 21:19:12 +01:00
@@ -16,11 +16,9 @@
#ifndef _EVENT_H_
#define _EVENT_H_
-#include "sp_head.h"
-#include "sp.h"
-
-extern ulong opt_event_executor;
+#include "sp.h"
+#include "sp_head.h"
#define EVEX_OK SP_OK
#define EVEX_KEY_NOT_FOUND SP_KEY_NOT_FOUND
--- 1.6/sql/event_executor.cc 2005-12-08 20:37:48 +01:00
+++ 1.7/sql/event_executor.cc 2005-12-12 21:19:12 +01:00
@@ -14,9 +14,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#include "mysql_priv.h"
-#include "event.h"
#include "event_priv.h"
+#include "event.h"
#include "sp.h"
@@ -40,6 +39,7 @@
bool evex_is_running= false;
+ulonglong evex_main_thread_id= 0;
ulong opt_event_executor;
my_bool event_executor_running_global_var= false;
static my_bool evex_mutexes_initted= false;
@@ -74,6 +74,7 @@
pthread_mutex_init(&LOCK_evex_running, MY_MUTEX_INIT_FAST);
}
+
int
init_events()
{
@@ -84,7 +85,7 @@
DBUG_PRINT("info",("Starting events main thread"));
evex_init_mutexes();
-
+
VOID(pthread_mutex_lock(&LOCK_evex_running));
evex_is_running= false;
event_executor_running_global_var= false;
@@ -109,6 +110,7 @@
VOID(pthread_mutex_lock(&LOCK_evex_running));
VOID(pthread_mutex_unlock(&LOCK_evex_running));
+
pthread_mutex_destroy(&LOCK_event_arrays);
pthread_mutex_destroy(&LOCK_workers_count);
pthread_mutex_destroy(&LOCK_evex_running);
@@ -182,7 +184,7 @@
if (init_event_thread(thd))
goto err;
-
+
// make this thread invisible it has no vio -> show processlist won't see
thd->system_thread= 1;
@@ -200,7 +202,12 @@
thus data should be freed at later stage.
*/
VOID(my_init_dynamic_array(&events_array, sizeof(event_timed), 50, 100));
+/**
VOID(my_init_dynamic_array(&evex_executing_queue, sizeof(event_timed *), 50, 100));
+**/
+
+ evex_queue_init(&EVEX_EQ_NAME);
+
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
/*
@@ -217,107 +224,132 @@
THD_CHECK_SENTRY(thd);
/* Read queries from the IO/THREAD until this thread is killed */
+ evex_main_thread_id= thd->thread_id;
+
while (!thd->killed)
{
TIME time_now;
my_time_t now;
my_ulonglong cnt;
+ event_timed *et;
DBUG_PRINT("info", ("EVEX External Loop %d", ++cnt));
thd->proc_info = "Sleeping";
- my_sleep(1000000);// sleep 1s
- if (!event_executor_running_global_var)
+ if (!evex_queue_num_elements(EVEX_EQ_NAME) ||
+ !event_executor_running_global_var)
+ {
+ my_sleep(1000000);// sleep 1s
continue;
- time(&now);
- my_tz_UTC->gmt_sec_to_TIME(&time_now, now);
+ }
- VOID(pthread_mutex_lock(&LOCK_event_arrays));
- for (i= 0; (i < evex_executing_queue.elements) && !thd->killed; ++i)
{
- event_timed *et= *dynamic_element(&evex_executing_queue,i,event_timed**);
-// printf("%llu\n", TIME_to_ulonglong_datetime(&et->execute_at));
+ int t2sleep;
+
+
+ /*
+ now let's see how much time to sleep, we know there is at least 1
+ element in the queue.
+ */
+ VOID(pthread_mutex_lock(&LOCK_event_arrays));
+ if (!evex_queue_num_elements(EVEX_EQ_NAME))
+ {
+ VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+ continue;
+ }
+ et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*);
+
+ time(&now);
+ my_tz_UTC->gmt_sec_to_TIME(&time_now, now);
+ t2sleep= evex_time_diff(&et->execute_at, &time_now);
+ VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+ if (t2sleep > 0)
+ {
+ sql_print_information("Sleeping for %d seconds.", t2sleep);
+ printf("\nWHEN=%llu NOW=%llu\n", TIME_to_ulonglong_datetime(&et->execute_at), TIME_to_ulonglong_datetime(&time_now));
+ /*
+ We sleep t2sleep seconds but we check every second whether this thread
+ has been killed, or there is new candidate
+ */
+ while (t2sleep-- && !thd->killed &&
+ evex_queue_num_elements(EVEX_EQ_NAME) &&
+ (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et))
+ my_sleep(1000000);
+ sql_print_information("Finished sleeping");
+ }
if (!event_executor_running_global_var)
- break;
+ continue;
- thd->proc_info = "Iterating";
- THD_CHECK_SENTRY(thd);
- /*
- if this is the first event which is after time_now then no
- more need to iterate over more elements since the array is sorted.
- */
- if (et->execute_at.year > 1969 &&
- my_time_compare(&time_now, &et->execute_at) == -1)
- break;
+ }
+
+
+ VOID(pthread_mutex_lock(&LOCK_event_arrays));
+
+ if (!evex_queue_num_elements(EVEX_EQ_NAME))
+ {
+ VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+ continue;
+ }
+ et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*);
- if (et->status == MYSQL_EVENT_ENABLED &&
- !check_access(thd, EVENT_ACL, et->dbname.str, 0, 0, 0,
- is_schema_db(et->dbname.str)))
- {
- pthread_t th;
+ /*
+ if this is the first event which is after time_now then no
+ more need to iterate over more elements since the array is sorted.
+ */
+ if (et->execute_at.year > 1969 &&
+ my_time_compare(&time_now, &et->execute_at) == -1)
+ {
+ VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+ continue;
+ }
+
+ if (et->status == MYSQL_EVENT_ENABLED)
+ {
+ pthread_t th;
- DBUG_PRINT("info", (" Spawning a thread %d", ++iter_num));
- thd->proc_info = "Starting new thread";
- sql_print_information(" Spawning a thread %d", ++iter_num);
+ DBUG_PRINT("info", (" Spawning a thread %d", ++iter_num));
+ sql_print_information(" Spawning a thread %d", ++iter_num);
#ifndef DBUG_FAULTY_THR
- if (pthread_create(&th, NULL, event_executor_worker, (void*)et))
- {
- sql_print_error("Problem while trying to create a thread");
- UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err);
- }
+ sql_print_information(" Thread is not debuggable!");
+ if (pthread_create(&th, NULL, event_executor_worker, (void*)et))
+ {
+ sql_print_error("Problem while trying to create a thread");
+ UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err);
+ }
#else
- event_executor_worker((void *) et);
+ event_executor_worker((void *) et);
#endif
- et->mark_last_executed();
- thd->proc_info = "Computing next time";
- et->compute_next_execution_time();
- et->update_fields(thd);
- if ((et->execute_at.year && !et->expression)
- || TIME_to_ulonglong_datetime(&et->execute_at) == 0L)
- et->flags |= EVENT_EXEC_NO_MORE;
- }
+ printf("[%10s] exec at [%llu]\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
+ et->mark_last_executed();
+ et->compute_next_execution_time();
+ printf("[%10s] next at [%llu]\n\n\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
+ et->update_fields(thd);
+ if ((et->execute_at.year && !et->expression) ||
+ TIME_to_ulonglong_datetime(&et->execute_at) == 0L)
+ et->flags |= EVENT_EXEC_NO_MORE;
}
- /*
- Let's remove elements which won't be executed any more
- The number is "i" and it is <= up to evex_executing_queue.elements
- */
- j= 0;
- while (j < i && j < evex_executing_queue.elements)
+ if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED)
{
- event_timed *et= *dynamic_element(&evex_executing_queue, j, event_timed**);
- if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED)
+ evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
+ if (et->dropped)
{
- delete_dynamic_element(&evex_executing_queue, j);
- DBUG_PRINT("EVEX main thread", ("DELETING FROM EXECUTION QUEUE [%s.%s]",
- et->dbname.str, et->name.str));
- // nulling the position, will delete later
- if (et->dropped)
- {
- // we have to drop the event
- int idx;
- et->drop(thd);
- idx= get_index_dynamic(&events_array, (gptr) et);
- DBUG_ASSERT(idx != -1);
- delete_dynamic_element(&events_array, idx);
- }
- continue;
+ // we have to drop the event
+ int idx;
+ et->drop(thd);
+ idx= get_index_dynamic(&events_array, (gptr) et);
+ DBUG_ASSERT(idx != -1);
+ delete_dynamic_element(&events_array, idx);
}
- ++j;
- }
- if (evex_executing_queue.elements)
- //ToDo Andrey : put a lock here
- qsort((gptr) dynamic_element(&evex_executing_queue, 0, event_timed**),
- evex_executing_queue.elements,
- sizeof(event_timed **),
- (qsort_cmp) event_timed_compare
- );
+ } else
+ evex_queue_first_updated(&EVEX_EQ_NAME);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
- }
+ }// while
err:
// First manifest that this thread does not work and then destroy
VOID(pthread_mutex_lock(&LOCK_evex_running));
- evex_is_running= false;
+ evex_is_running= false;
+ evex_main_thread_id= 0;
VOID(pthread_mutex_unlock(&LOCK_evex_running));
sql_print_information("Event scheduler stopping");
@@ -341,7 +373,8 @@
VOID(pthread_mutex_lock(&LOCK_event_arrays));
// No need to use lock here if EVEX is not running but anyway
- delete_dynamic(&evex_executing_queue);
+ delete_queue(&executing_queue);
+ evex_queue_destroy(&EVEX_EQ_NAME);
delete_dynamic(&events_array);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
@@ -353,8 +386,10 @@
pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
thread_running--;
+#ifndef DBUG_FAULTY_THR
THD_CHECK_SENTRY(thd);
delete thd;
+#endif
pthread_mutex_unlock(&LOCK_thread_count);
@@ -366,8 +401,10 @@
free_root(&evex_mem_root, MYF(0));
sql_print_information("Event scheduler stopped");
+#ifndef DBUG_FAULTY_THR
my_thread_end();
pthread_exit(0);
+#endif
DBUG_RETURN(0);// Can't return anything here
}
@@ -386,6 +423,7 @@
init_alloc_root(&worker_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
+#ifndef DBUG_FAULTY_THR
my_thread_init();
if (!(thd = new THD)) // note that contructor of THD uses DBUG_ !
@@ -411,6 +449,9 @@
thread_count++;
thread_running++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
+#else
+ thd= current_thd;
+#endif
// thd->security_ctx->priv_host is char[MAX_HOSTNAME]
@@ -420,6 +461,8 @@
thd->security_ctx->priv_user= event->definer_user.str;
thd->db= event->dbname.str;
+ if (!check_access(thd, EVENT_ACL, event->dbname.str, 0, 0, 0,
+ is_schema_db(event->dbname.str)))
{
char exec_time[200];
int ret;
@@ -434,6 +477,7 @@
err:
VOID(pthread_mutex_lock(&LOCK_thread_count));
+#ifndef DBUG_FAULTY_THR
thread_count--;
thread_running--;
/*
@@ -451,6 +495,7 @@
VOID(pthread_mutex_lock(&LOCK_thread_count));
THD_CHECK_SENTRY(thd);
delete thd;
+#endif
VOID(pthread_mutex_unlock(&LOCK_thread_count));
err_no_thd:
@@ -502,6 +547,12 @@
"Table probably corrupted");
goto end;
}
+ if (et->status != MYSQL_EVENT_ENABLED)
+ {
+ DBUG_PRINT("evex_load_events_from_db",("Event %s is disabled", et->name.str));
+ delete et;
+ continue;
+ }
DBUG_PRINT("evex_load_events_from_db",
("Event %s loaded from row. Time to compile", et->name.str));
@@ -515,8 +566,7 @@
// let's find when to be executed
et->compute_next_execution_time();
- DBUG_PRINT("evex_load_events_from_db",
- ("Adding %s to the executor list.", et->name.str));
+ DBUG_PRINT("evex_load_events_from_db", ("Adding to the exec list."));
VOID(push_dynamic(&events_array,(gptr) et));
/*
We always add at the end so the number of elements - 1 is the place
@@ -526,23 +576,21 @@
*/
et_copy= dynamic_element(&events_array, events_array.elements - 1,
event_timed*);
- VOID(push_dynamic(&evex_executing_queue,(gptr) &et_copy));
+
+ evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et_copy);
+ printf("%p %s\n", et_copy, et_copy->name.str);
et->free_sphead_on_delete= false;
delete et;
}
- end_read_record(&read_record_info);
- qsort((gptr) dynamic_element(&evex_executing_queue, 0, event_timed**),
- evex_executing_queue.elements,
- sizeof(event_timed **),
- (qsort_cmp) event_timed_compare
- );
+ ret= 0;
+
+end:
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+ end_read_record(&read_record_info);
thd->version--; // Force close to free memory
- ret= 0;
-end:
close_thread_tables(thd);
DBUG_PRINT("info", ("Finishing with status code %d", ret));
--- 1.6/sql/event_priv.h 2005-12-08 15:34:04 +01:00
+++ 1.7/sql/event_priv.h 2005-12-12 21:19:12 +01:00
@@ -16,8 +16,11 @@
#ifndef _EVENT_PRIV_H_
#define _EVENT_PRIV_H_
+#include "mysql_priv.h"
+#define EVEX_USE_QUEUE
+
#define UNLOCK_MUTEX_AND_BAIL_OUT(__mutex, __label) \
{ VOID(pthread_mutex_unlock(&__mutex)); goto __label; }
@@ -41,14 +44,6 @@
EVEX_FIELD_COUNT /* a cool trick to count the number of fields :) */
};
-extern bool evex_is_running;
-extern bool mysql_event_table_exists;
-extern DYNAMIC_ARRAY events_array;
-extern DYNAMIC_ARRAY evex_executing_queue;
-extern MEM_ROOT evex_mem_root;
-extern pthread_mutex_t LOCK_event_arrays,
- LOCK_workers_count,
- LOCK_evex_running;
int
@@ -59,5 +54,72 @@
const LEX_STRING rname, TABLE *table);
TABLE *
-evex_open_event_table(THD *thd, enum thr_lock_type lock_type);
+evex_open_event_table(THD *thd, enum thr_lock_type lock_type);
+
+int
+event_timed_compare_q(void *vptr, byte* a, byte *b);
+
+int
+evex_time_diff(TIME *a, TIME *b);
+
+
+
+#define EXEC_QUEUE_QUEUE_NAME executing_queue
+#define EXEC_QUEUE_DARR_NAME evex_executing_queue
+
+#ifdef EVEX_USE_QUEUE
+ #define EVEX_QUEUE_TYPE QUEUE
+ #define EVEX_PTOQEL byte *
+ #define EVEX_EQ_NAME executing_queue
+
+ #define evex_queue_first_element(queue, __cast) ((__cast)queue_top(queue))
+ #define evex_queue_element(queue, idx, __cast) ((__cast)queue_top(queue))
+ #define evex_queue_delete_element(queue, idx) queue_remove(queue, idx)
+ #define evex_queue_destroy(queue) delete_queue(queue)
+ #define evex_queue_first_updated(queue) queue_replaced(queue)
+ #define evex_queue_insert(queue, element) queue_insert_safe(queue, element);
+
+#else
+ #define EVEX_QUEUE_TYPE DYNAMIC_ARRAY
+ #define EVEX_PTOQEL gptr
+ #define EVEX_EQ_NAME evex_executing_queue
+
+ #define evex_queue_element(queue, idx, __cast) dynamic_element(queue,idx, __cast)
+ #define evex_queue_delete_element(queue, idx) delete_dynamic_element(queue, idx);
+ #define evex_queue_destroy(queue) delete_dynamic(queue)
+/*
+ push_dynamic() expects ptr to the memory to put in, to make things fast
+ so when a pointer has to be put inside a ptr-to-ptr is being passed
+*/
+ #define evex_queue_first_updated(queue)
+ #define evex_queue_insert(queue, element) VOID(push_dynamic(queue, &element))
+
+
+#endif
+
+
+void
+evex_queue_init(EVEX_QUEUE_TYPE *queue);
+
+int
+evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element);
+
+void
+evex_queue_sort(EVEX_QUEUE_TYPE *queue);
+
+#define evex_queue_num_elements(queue) queue.elements
+
+
+extern bool evex_is_running;
+extern bool mysql_event_table_exists;
+extern DYNAMIC_ARRAY events_array;
+extern DYNAMIC_ARRAY EXEC_QUEUE_DARR_NAME;
+extern QUEUE EXEC_QUEUE_QUEUE_NAME;
+extern MEM_ROOT evex_mem_root;
+extern pthread_mutex_t LOCK_event_arrays,
+ LOCK_workers_count,
+ LOCK_evex_running;
+extern ulonglong evex_main_thread_id;
+
+
#endif /* _EVENT_PRIV_H_ */
--- 1.8/sql/event_timed.cc 2005-12-08 20:37:48 +01:00
+++ 1.9/sql/event_timed.cc 2005-12-12 21:19:12 +01:00
@@ -14,9 +14,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#include "mysql_priv.h"
-#include "event.h"
#include "event_priv.h"
+#include "event.h"
#include "sp.h"
@@ -789,7 +788,7 @@
+ strlen("DO ") +
+ body.length + strlen(";");
- ret= dst= (char*) alloc_root(thd->mem_root, len);
+ ret= dst= (char*) alloc_root(thd->mem_root, len + 1);
memcpy(dst, "CREATE EVENT ", tmp_len= strlen("CREATE EVENT "));
dst+= tmp_len;
memcpy(dst, dbname.str, tmp_len=dbname.length);
@@ -832,7 +831,7 @@
*dst= '\0';
*length= len;
-
+ dst[len]= '\0';
return ret;
}
| Thread |
|---|
| • bk commit into 5.1 tree (andrey:1.1993) | ahristov | 12 Dec |