List:Internals« Previous MessageNext Message »
From:ahristov Date:December 12 2005 8:19pm
Subject:bk commit into 5.1 tree (andrey:1.1993)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of andrey. When andrey does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1993 05/12/12 21:19:19 andrey@lmy004. +7 -0
  WL#1034 update
  QUEUE implementation working now. this should be ready more or less
  for testing once the debug output is being cleaned and some things
  around DYNAMIC_ARRAY are cleaned
  - fix handling in case of errors that lead to crashes, now no more crashes
    in case of table corruption and such.

  sql/event_timed.cc
    1.9 05/12/12 21:19:12 andrey@lmy004. +3 -4
    allocate one more byte and zeroterminate, really

  sql/event_priv.h
    1.7 05/12/12 21:19:12 andrey@lmy004. +71 -9
    - reorder a bit
    - add macroses and functions for queue manipulation which stay on top
     of QUEUE (partly implemented for DYNAMIC_ARRAY but will be cleared to be
     only for QUEUE).

  sql/event_executor.cc
    1.7 05/12/12 21:19:12 andrey@lmy004. +137 -89
    reorder

  sql/event.h
    1.7 05/12/12 21:19:12 andrey@lmy004. +2 -4
    reorder

  sql/event.cc
    1.10 05/12/12 21:19:12 andrey@lmy004. +117 -10
    - move mysql_priv.h inclusion to event_priv.h
    - use a priority queue instead of DYNAMIC_ARRAY which is sorted

  mysys/queues.c
    1.17 05/12/12 21:19:12 andrey@lmy004. +66 -1
    add init_queue_ex() implementation
    add queue_insert_safe() implementation

  include/queues.h
    1.13 05/12/12 21:19:12 andrey@lmy004. +6 -0
    introduce a safe version of queue_insert that will extend the queue if
    necessary. the auto_extent is passed to the _ex version of init_queue()

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	andrey
# Host:	lmy004.
# Root:	/work/mysql-5.1-tt-copy-works

--- 1.12/include/queues.h	2005-07-18 13:30:08 +02:00
+++ 1.13/include/queues.h	2005-12-12 21:19:12 +01:00
@@ -35,6 +35,7 @@
   uint offset_to_key;			/* compare is done on element+offset */
   int max_at_top;			/* Set if queue_top gives max */
   int  (*compare)(void *, byte *,byte *);
+  uint auto_extent;
 } QUEUE;
 
 #define queue_top(queue) ((queue)->root[1])
@@ -49,14 +50,19 @@
 int init_queue(QUEUE *queue,uint max_elements,uint offset_to_key,
 	       pbool max_at_top, queue_compare compare,
 	       void *first_cmp_arg);
+int init_queue_ex(QUEUE *queue,uint max_elements,uint offset_to_key,
+	       pbool max_at_top, queue_compare compare,
+	       void *first_cmp_arg, uint auto_extent);
 int reinit_queue(QUEUE *queue,uint max_elements,uint offset_to_key,
                  pbool max_at_top, queue_compare compare,
                  void *first_cmp_arg);
 int resize_queue(QUEUE *queue, uint max_elements);
 void delete_queue(QUEUE *queue);
 void queue_insert(QUEUE *queue,byte *element);
+int queue_insert_safe(QUEUE *queue, byte *element);
 byte *queue_remove(QUEUE *queue,uint idx);
 #define queue_remove_all(queue) { (queue)->elements= 0; }
+#define queue_is_full(queue) (queue->elements == queue->max_elements)
 void _downheap(QUEUE *queue,uint idx);
 void queue_fix(QUEUE *queue);
 #define is_queue_inited(queue) ((queue)->root != 0)

--- 1.16/mysys/queues.c	2005-07-18 13:30:08 +02:00
+++ 1.17/mysys/queues.c	2005-12-12 21:19:12 +01:00
@@ -19,7 +19,7 @@
   Implemention of queues from "Algoritms in C" by Robert Sedgewick.
   An optimisation of _downheap suggested in Exercise 7.51 in "Data
   Structures & Algorithms in C++" by Mark Allen Weiss, Second Edition
+  was implemented by Mikael Ronstrom 2005. Also the O(N) algorithm
   of queue_fix was implemented.
 */
 
@@ -67,6 +67,46 @@
 }
 
 
+
+/*
+  Init queue, uses init_queue internally for init work but also accepts
+  auto_extent as parameter
+
+  SYNOPSIS
+    init_queue_ex()
+    queue		Queue to initialise
+    max_elements	Max elements that will be put in queue
+    offset_to_key	Offset to key in element stored in queue
+			Used when sending pointers to compare function
+    max_at_top		Set to 1 if you want biggest element on top.
+    compare		Compare function for elements, takes 3 arguments.
+    first_cmp_arg	First argument to compare function
+    auto_extent         When the queue is full and there is insert operation
+                        extend the queue.
+
+  NOTES
+    Will allocate max_element pointers for queue array
+
+  RETURN
+    0	ok
+    1	Could not allocate memory
+*/
+
+int init_queue_ex(QUEUE *queue, uint max_elements, uint offset_to_key,
+	       pbool max_at_top, int (*compare) (void *, byte *, byte *),
+	       void *first_cmp_arg, uint auto_extent)
+{
+  int ret;
+  DBUG_ENTER("init_queue_ex");
+
+  if ((ret= init_queue(queue, max_elements, offset_to_key, max_at_top, compare,
+                       first_cmp_arg)))
+    DBUG_RETURN(ret);
+  
+  queue->auto_extent= auto_extent;
+  DBUG_RETURN(0);
+}
+
 /*
   Reinitialize queue for other usage
 
@@ -191,6 +231,31 @@
     queue->root[idx]=element;
   }
 }
+
+/*
+  Does safe insert. If no more space left on the queue resize it.
+  Return codes:
+    0 - OK
+    1 - Cannot allocate more memory
+    2 - auto_extend is 0, the operation would
+  
+*/
+
+int queue_insert_safe(register QUEUE *queue, byte *element)
+{
+
+  if (queue->elements == queue->max_elements)
+  {
+    if (!queue->auto_extent)
+      return 2;
+    else if (resize_queue(queue, queue->max_elements + queue->auto_extent))
+      return 1;
+  }
+  
+  queue_insert(queue, element);
+  return 0;
+}
+
 
 	/* Remove item from queue */
 	/* Returns pointer to removed element */

--- 1.9/sql/event.cc	2005-12-08 20:37:48 +01:00
+++ 1.10/sql/event.cc	2005-12-12 21:19:12 +01:00
@@ -14,16 +14,14 @@
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
-#include "mysql_priv.h"
-#include "event.h"
 #include "event_priv.h"
+#include "event.h"
 #include "sp.h"
 
 /*
  TODO list :
  - The default value of created/modified should not be 0000-00-00 because of
    STRICT mode restricions.
- - Remove m_ prefixes of member variables.
 
  - Use timestamps instead of datetime.
 
@@ -53,9 +51,17 @@
  - Consider using conditional variable when doing shutdown instead of
      waiting till all worker threads end.
  - Make event_timed::get_show_create_event() work
+
  - Add function documentation whenever needed.
+
  - Add logging to file
 
+ - Move comparison code to class event_timed
+
+ - Overload event_timed::new to put the event directly in the DYNAMIC_ARRAY.
+   This will skip copy operation as well as will simplify the code which is
+   now aware of events_array DYNAMIC_ARRAY
+
 Warning:
  - For now parallel execution is not possible because the same sp_head cannot be
    executed few times!!! There is still no lock attached to particular event.
@@ -67,19 +73,60 @@
 
 bool mysql_event_table_exists= 1;
 DYNAMIC_ARRAY events_array;
-DYNAMIC_ARRAY evex_executing_queue;
+DYNAMIC_ARRAY EXEC_QUEUE_DARR_NAME;
+QUEUE EXEC_QUEUE_QUEUE_NAME;
 MEM_ROOT evex_mem_root;
 
 
-
 //extern volatile uint thread_running;
 ////////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////
 ////////////////     Static functions follow ///////////////////////////
 ////////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////
+void
+evex_queue_init(EVEX_QUEUE_TYPE *queue)
+{
+#ifndef EVEX_USE_QUEUE
+  VOID(my_init_dynamic_array(queue, sizeof(event_timed *), 50, 100));
+#else
+  if (init_queue_ex(queue, 100 /*num_el*/, 0 /*offset*/, 
+                   0 /*smallest_on_top*/, event_timed_compare_q, NULL,
+                   100 /*auto_extent*/))
+    sql_print_error("Insufficient memory to initialize executing queue.");
+#endif
+}
 
 
+int
+evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element)
+{
+#ifndef EVEX_USE_QUEUE
+  VOID(push_dynamic(queue, element));
+  return 0;
+#else
+  return queue_insert_safe(queue, element);  
+#endif
+}
+
+void
+evex_queue_top_updated(EVEX_QUEUE_TYPE *queue)
+{
+#ifdef EVEX_USE_QUEUE
+  queue_replaced(queue);
+#endif
+}
+
+void
+evex_queue_sort(EVEX_QUEUE_TYPE *queue)
+{
+#ifndef EVEX_USE_QUEUE
+  qsort((gptr) dynamic_element(queue, 0, event_timed**),
+                               queue->elements,
+                               sizeof(event_timed **),
+                               (qsort_cmp) event_timed_compare);
+#endif
+}
 
 /* NOTE Andrey: Document better
   Compares two TIME structures.
@@ -98,7 +145,7 @@
 }
 
 
-inline int
+int
 my_time_compare(TIME *a, TIME *b)
 {
 /*
@@ -107,6 +154,7 @@
 */
 
   DBUG_ENTER("my_time_compare");
+
   if (a->year > b->year)
     DBUG_RETURN(1);
   
@@ -143,19 +191,53 @@
   if (a->second < b->second)
     DBUG_RETURN(-1);
 
- /*!!  second_part is not compared !*/
+
+  if (a->second_part > b->second_part)
+    DBUG_RETURN(1);
+  
+  if (a->second_part < b->second_part)
+    DBUG_RETURN(-1);
+
 
   DBUG_RETURN(0);
 }
 
+int
+evex_time_diff(TIME *a, TIME *b)
+{
+  my_bool in_gap;
+  DBUG_ENTER("my_time_diff");
+  
+  return sec_since_epoch_TIME(a) - sec_since_epoch_TIME(b);
+}
+
 
 inline int
 event_timed_compare(event_timed **a, event_timed **b)
 {
-  return my_time_compare(&(*a)->execute_at, &(*b)->execute_at);
+  my_ulonglong a_t, b_t;
+  a_t= TIME_to_ulonglong_datetime(&(*a)->execute_at)*100L + 
+       (*a)->execute_at.second_part;
+  b_t= TIME_to_ulonglong_datetime(&(*b)->execute_at)*100L + 
+       (*b)->execute_at.second_part;
+
+  if (a_t > b_t)
+    return 1;
+  else if (a_t < b_t)
+    return -1;
+  else
+    return 0;
+  
+//  return my_time_compare(&(*a)->execute_at, &(*b)->execute_at);
 }
 
 
+int 
+event_timed_compare_q(void *vptr, byte* a, byte *b)
+{
+  return event_timed_compare((event_timed **)&a, (event_timed **)&b);
+}
+
 
 /*
   Open mysql.event table for read
@@ -660,7 +742,10 @@
   VOID(push_dynamic(&events_array,(gptr) ett));
   ett_copy= dynamic_element(&events_array, events_array.elements - 1,
                             event_timed*);
+/**
   VOID(push_dynamic(&evex_executing_queue, (gptr) &ett_copy));
+**/
+  evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) ett_copy);
 
   /*
     There is a copy in the array which we don't need. sphead won't be
@@ -674,11 +759,14 @@
     qsort of events_array.elements (the current number of elements).
     We know that the elements are stored in a contiguous block w/o holes.
   */
+/**
   qsort((gptr) dynamic_element(&evex_executing_queue, 0, event_timed**),
                                evex_executing_queue.elements,
                                sizeof(event_timed **),
                                (qsort_cmp) event_timed_compare);
-
+**/
+  evex_queue_sort(&EVEX_EQ_NAME);
+  
   if (use_lock)
     VOID(pthread_mutex_unlock(&LOCK_event_arrays));
 
@@ -703,7 +791,7 @@
 
   if (use_lock)
     VOID(pthread_mutex_lock(&LOCK_event_arrays));
- 
+/**
   for (i= 0; i < evex_executing_queue.elements; ++i)
   {
     event_timed *et= *dynamic_element(&evex_executing_queue, i, event_timed**);
@@ -729,6 +817,25 @@
       et->free_sp();
       delete_dynamic_element(&events_array, idx);
       delete_dynamic_element(&evex_executing_queue, i);
+      // ok, we have cleaned
+      goto done;
+    }
+  }
+**/
+  for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i)
+  {
+    event_timed *et= *evex_queue_element(&EVEX_EQ_NAME, i, event_timed**);
+    DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?",db->str,name->str, et->dbname.str, 
+                        et->name.str));
+    if (!sortcmp_lex_string(*name, et->name, system_charset_info) &&
+        !sortcmp_lex_string(*db, et->dbname, system_charset_info))
+    {
+      int idx= get_index_dynamic(&events_array, (gptr) et);
+      //we are lucky the event is in the executing queue, no need of second pass
+      //destruct first and then remove. the destructor will delete sp_head
+      et->free_sp();
+      evex_queue_delete_element(&EVEX_EQ_NAME, idx);
+      evex_queue_delete_element(&EVEX_EQ_NAME, i);
       // ok, we have cleaned
       goto done;
     }

--- 1.6/sql/event.h	2005-12-08 15:34:03 +01:00
+++ 1.7/sql/event.h	2005-12-12 21:19:12 +01:00
@@ -16,11 +16,9 @@
 
 #ifndef _EVENT_H_
 #define _EVENT_H_
-#include "sp_head.h"
-#include "sp.h"
-
 
-extern ulong opt_event_executor;
+#include "sp.h"
+#include "sp_head.h"
 
 #define EVEX_OK                 SP_OK
 #define EVEX_KEY_NOT_FOUND      SP_KEY_NOT_FOUND

--- 1.6/sql/event_executor.cc	2005-12-08 20:37:48 +01:00
+++ 1.7/sql/event_executor.cc	2005-12-12 21:19:12 +01:00
@@ -14,9 +14,8 @@
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
-#include "mysql_priv.h"
-#include "event.h"
 #include "event_priv.h"
+#include "event.h"
 #include "sp.h"
 
 
@@ -40,6 +39,7 @@
 
 bool evex_is_running= false;
 
+ulonglong evex_main_thread_id= 0;
 ulong opt_event_executor;
 my_bool event_executor_running_global_var= false;
 static my_bool evex_mutexes_initted= false;
@@ -74,6 +74,7 @@
   pthread_mutex_init(&LOCK_evex_running, MY_MUTEX_INIT_FAST);
 }
 
+
 int
 init_events()
 {
@@ -84,7 +85,7 @@
   DBUG_PRINT("info",("Starting events main thread"));
 
   evex_init_mutexes();
-  
+
   VOID(pthread_mutex_lock(&LOCK_evex_running));
   evex_is_running= false;  
   event_executor_running_global_var= false;
@@ -109,6 +110,7 @@
   
   VOID(pthread_mutex_lock(&LOCK_evex_running));
   VOID(pthread_mutex_unlock(&LOCK_evex_running));
+
   pthread_mutex_destroy(&LOCK_event_arrays);
   pthread_mutex_destroy(&LOCK_workers_count);
   pthread_mutex_destroy(&LOCK_evex_running);
@@ -182,7 +184,7 @@
 
   if (init_event_thread(thd))
     goto err;
-
+  
   // make this thread invisible it has no vio -> show processlist won't see
   thd->system_thread= 1;
 
@@ -200,7 +202,12 @@
     thus data should be freed at later stage.
   */
   VOID(my_init_dynamic_array(&events_array, sizeof(event_timed), 50, 100));
+/**
   VOID(my_init_dynamic_array(&evex_executing_queue, sizeof(event_timed *), 50, 100));
+**/
+
+  evex_queue_init(&EVEX_EQ_NAME);
+
   VOID(pthread_mutex_unlock(&LOCK_event_arrays));  
 
   /*
@@ -217,107 +224,132 @@
 
   THD_CHECK_SENTRY(thd);
   /* Read queries from the IO/THREAD until this thread is killed */
+  evex_main_thread_id= thd->thread_id;
+
   while (!thd->killed)
   {
     TIME time_now;
     my_time_t now;
     my_ulonglong cnt;
+    event_timed *et;
     
     DBUG_PRINT("info", ("EVEX External Loop %d", ++cnt));
     thd->proc_info = "Sleeping";
-    my_sleep(1000000);// sleep 1s
-    if (!event_executor_running_global_var)
+    if (!evex_queue_num_elements(EVEX_EQ_NAME) ||
+        !event_executor_running_global_var)
+    {
+      my_sleep(1000000);// sleep 1s
       continue;
-    time(&now);
-    my_tz_UTC->gmt_sec_to_TIME(&time_now, now);
+    }
 
-    VOID(pthread_mutex_lock(&LOCK_event_arrays));
-    for (i= 0; (i < evex_executing_queue.elements) && !thd->killed; ++i)
     {
-      event_timed *et= *dynamic_element(&evex_executing_queue,i,event_timed**);
-//      printf("%llu\n", TIME_to_ulonglong_datetime(&et->execute_at));
+      int t2sleep;
+      
+        
+      /*
+        now let's see how much time to sleep, we know there is at least 1
+        element in the queue.
+      */
+      VOID(pthread_mutex_lock(&LOCK_event_arrays));
+      if (!evex_queue_num_elements(EVEX_EQ_NAME))
+      {
+        VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+        continue;
+      }
+      et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*);
+        
+      time(&now);
+      my_tz_UTC->gmt_sec_to_TIME(&time_now, now);
+      t2sleep= evex_time_diff(&et->execute_at, &time_now);
+      VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+      if (t2sleep > 0)
+      {
+        sql_print_information("Sleeping for %d seconds.", t2sleep);
+        printf("\nWHEN=%llu   NOW=%llu\n", TIME_to_ulonglong_datetime(&et->execute_at), TIME_to_ulonglong_datetime(&time_now));
+        /*
+          We sleep t2sleep seconds but we check every second whether this thread
+          has been killed, or there is new candidate
+        */
+        while (t2sleep-- && !thd->killed &&
+               evex_queue_num_elements(EVEX_EQ_NAME) &&
+               (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et))
+          my_sleep(1000000);
+        sql_print_information("Finished sleeping");
+      }
       if (!event_executor_running_global_var)
-        break;
+        continue;
 
-      thd->proc_info = "Iterating";
-      THD_CHECK_SENTRY(thd);
-      /*
-        if this is the first event which is after time_now then no
-        more need to iterate over more elements since the array is sorted.
-      */ 
-      if (et->execute_at.year > 1969 &&
-          my_time_compare(&time_now, &et->execute_at) == -1)
-        break;
+    }
+
+
+    VOID(pthread_mutex_lock(&LOCK_event_arrays));
+
+    if (!evex_queue_num_elements(EVEX_EQ_NAME))
+    {
+      VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+      continue;
+    }
+    et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*);
       
-      if (et->status == MYSQL_EVENT_ENABLED && 
-          !check_access(thd, EVENT_ACL, et->dbname.str, 0, 0, 0,
-                        is_schema_db(et->dbname.str)))
-      {
-        pthread_t th;
+    /*
+      if this is the first event which is after time_now then no
+      more need to iterate over more elements since the array is sorted.
+    */ 
+    if (et->execute_at.year > 1969 &&
+        my_time_compare(&time_now, &et->execute_at) == -1)
+    {
+      VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+      continue;
+    } 
+      
+    if (et->status == MYSQL_EVENT_ENABLED)
+    {
+      pthread_t th;
 
-        DBUG_PRINT("info", ("  Spawning a thread %d", ++iter_num));
-        thd->proc_info = "Starting new thread";
-        sql_print_information("  Spawning a thread %d", ++iter_num);
+      DBUG_PRINT("info", ("  Spawning a thread %d", ++iter_num));
+      sql_print_information("  Spawning a thread %d", ++iter_num);
 #ifndef DBUG_FAULTY_THR
-        if (pthread_create(&th, NULL, event_executor_worker, (void*)et))
-        {
-          sql_print_error("Problem while trying to create a thread");
-          UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err);
-        }
+      sql_print_information("  Thread is not debuggable!");
+      if (pthread_create(&th, NULL, event_executor_worker, (void*)et))
+      {
+        sql_print_error("Problem while trying to create a thread");
+        UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err);
+      }
 #else
-        event_executor_worker((void *) et);
+      event_executor_worker((void *) et);
 #endif
-        et->mark_last_executed();
-        thd->proc_info = "Computing next time";
-        et->compute_next_execution_time();
-        et->update_fields(thd);
-        if ((et->execute_at.year && !et->expression)
-            || TIME_to_ulonglong_datetime(&et->execute_at) == 0L)
-          et->flags |= EVENT_EXEC_NO_MORE;
-      }
+      printf("[%10s] exec at [%llu]\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
+      et->mark_last_executed();
+      et->compute_next_execution_time();
+      printf("[%10s] next at [%llu]\n\n\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
+      et->update_fields(thd);
+      if ((et->execute_at.year && !et->expression) ||
+           TIME_to_ulonglong_datetime(&et->execute_at) == 0L)
+         et->flags |= EVENT_EXEC_NO_MORE;
     }
-    /*
-      Let's remove elements which won't be executed any more
-      The number is "i" and it is <= up to evex_executing_queue.elements
-    */
-    j= 0;
-    while (j < i && j < evex_executing_queue.elements)
+    if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED)
     {
-      event_timed *et= *dynamic_element(&evex_executing_queue, j, event_timed**);
-      if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED)
+      evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
+      if (et->dropped)
       {
-        delete_dynamic_element(&evex_executing_queue, j);
-        DBUG_PRINT("EVEX main thread", ("DELETING FROM EXECUTION QUEUE [%s.%s]", 
-                                         et->dbname.str, et->name.str));
-        // nulling the position, will delete later
-        if (et->dropped)
-        {
-          // we have to drop the event
-          int idx;
-          et->drop(thd);
-          idx= get_index_dynamic(&events_array, (gptr) et);
-          DBUG_ASSERT(idx != -1);
-          delete_dynamic_element(&events_array, idx);
-        }
-        continue;
+        // we have to drop the event
+        int idx;
+        et->drop(thd);
+        idx= get_index_dynamic(&events_array, (gptr) et);
+        DBUG_ASSERT(idx != -1);
+        delete_dynamic_element(&events_array, idx);
       }
-      ++j;
-    }
-    if (evex_executing_queue.elements)
-      //ToDo Andrey : put a lock here
-      qsort((gptr) dynamic_element(&evex_executing_queue, 0, event_timed**),
-                                   evex_executing_queue.elements,
-                                   sizeof(event_timed **),
-                                   (qsort_cmp) event_timed_compare
-                                 );
+    } else
+        evex_queue_first_updated(&EVEX_EQ_NAME);
 
     VOID(pthread_mutex_unlock(&LOCK_event_arrays));
-  }
+  }// while
 
 err:
   // First manifest that this thread does not work and then destroy
   VOID(pthread_mutex_lock(&LOCK_evex_running));
-  evex_is_running= false;  
+  evex_is_running= false;
+  evex_main_thread_id= 0;
   VOID(pthread_mutex_unlock(&LOCK_evex_running));
 
   sql_print_information("Event scheduler stopping");
@@ -341,7 +373,8 @@
 
   VOID(pthread_mutex_lock(&LOCK_event_arrays));
   // No need to use lock here if EVEX is not running but anyway
-  delete_dynamic(&evex_executing_queue);
+  delete_queue(&executing_queue);
+  evex_queue_destroy(&EVEX_EQ_NAME);
   delete_dynamic(&events_array);
   VOID(pthread_mutex_unlock(&LOCK_event_arrays));
   
@@ -353,8 +386,10 @@
   pthread_mutex_lock(&LOCK_thread_count);
   thread_count--;
   thread_running--;
+#ifndef DBUG_FAULTY_THR
   THD_CHECK_SENTRY(thd);
   delete thd;
+#endif
   pthread_mutex_unlock(&LOCK_thread_count);
 
 
@@ -366,8 +401,10 @@
   free_root(&evex_mem_root, MYF(0));
   sql_print_information("Event scheduler stopped");
 
+#ifndef DBUG_FAULTY_THR
   my_thread_end();
   pthread_exit(0);
+#endif
   DBUG_RETURN(0);// Can't return anything here
 }
 
@@ -386,6 +423,7 @@
 
   init_alloc_root(&worker_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
 
+#ifndef DBUG_FAULTY_THR
   my_thread_init();
 
   if (!(thd = new THD)) // note that contructor of THD uses DBUG_ !
@@ -411,6 +449,9 @@
   thread_count++;
   thread_running++;
   VOID(pthread_mutex_unlock(&LOCK_thread_count));
+#else
+  thd= current_thd;
+#endif
 
   // thd->security_ctx->priv_host is char[MAX_HOSTNAME]
   
@@ -420,6 +461,8 @@
   thd->security_ctx->priv_user= event->definer_user.str;
 
   thd->db= event->dbname.str;
+  if (!check_access(thd, EVENT_ACL, event->dbname.str, 0, 0, 0,
+                      is_schema_db(event->dbname.str)))
   {
     char exec_time[200];
     int ret;
@@ -434,6 +477,7 @@
 
 err:
   VOID(pthread_mutex_lock(&LOCK_thread_count));
+#ifndef DBUG_FAULTY_THR
   thread_count--;
   thread_running--;
   /*
@@ -451,6 +495,7 @@
   VOID(pthread_mutex_lock(&LOCK_thread_count));
   THD_CHECK_SENTRY(thd);
   delete thd;
+#endif
   VOID(pthread_mutex_unlock(&LOCK_thread_count));
 
 err_no_thd:
@@ -502,6 +547,12 @@
                       "Table probably corrupted");
       goto end;
     }
+    if (et->status != MYSQL_EVENT_ENABLED)
+    {
+      DBUG_PRINT("evex_load_events_from_db",("Event %s is disabled", et->name.str));
+      delete et;
+      continue;
+    }
     
     DBUG_PRINT("evex_load_events_from_db",
             ("Event %s loaded from row. Time to compile", et->name.str));
@@ -515,8 +566,7 @@
     // let's find when to be executed  
     et->compute_next_execution_time();
     
-    DBUG_PRINT("evex_load_events_from_db",
-                ("Adding %s to the executor list.", et->name.str));
+    DBUG_PRINT("evex_load_events_from_db", ("Adding to the exec list."));
     VOID(push_dynamic(&events_array,(gptr) et));
     /*
       We always add at the end so the number of elements - 1 is the place
@@ -526,23 +576,21 @@
     */
     et_copy= dynamic_element(&events_array, events_array.elements - 1,
                              event_timed*);
-    VOID(push_dynamic(&evex_executing_queue,(gptr) &et_copy));
+
+    evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et_copy);
+    printf("%p %s\n", et_copy, et_copy->name.str);
     et->free_sphead_on_delete= false;
     delete et; 
   }
-  end_read_record(&read_record_info);
 
-  qsort((gptr) dynamic_element(&evex_executing_queue, 0, event_timed**),
-                               evex_executing_queue.elements,
-                               sizeof(event_timed **),
-                               (qsort_cmp) event_timed_compare
-                              );
+  ret= 0;
+
+end:
   VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+  end_read_record(&read_record_info);
 
   thd->version--;  // Force close to free memory
-  ret= 0;
 
-end:
   close_thread_tables(thd);
 
   DBUG_PRINT("info", ("Finishing with status code %d", ret));

--- 1.6/sql/event_priv.h	2005-12-08 15:34:04 +01:00
+++ 1.7/sql/event_priv.h	2005-12-12 21:19:12 +01:00
@@ -16,8 +16,11 @@
 
 #ifndef _EVENT_PRIV_H_
 #define _EVENT_PRIV_H_
+#include "mysql_priv.h"
 
 
+#define EVEX_USE_QUEUE
+
 #define UNLOCK_MUTEX_AND_BAIL_OUT(__mutex, __label) \
     { VOID(pthread_mutex_unlock(&__mutex)); goto __label; }
 
@@ -41,14 +44,6 @@
   EVEX_FIELD_COUNT /* a cool trick to count the number of fields :) */
 };
 
-extern bool evex_is_running;
-extern bool mysql_event_table_exists;
-extern DYNAMIC_ARRAY events_array;
-extern DYNAMIC_ARRAY evex_executing_queue;
-extern MEM_ROOT evex_mem_root;
-extern pthread_mutex_t LOCK_event_arrays,
-                       LOCK_workers_count,
-                       LOCK_evex_running;
 
 
 int
@@ -59,5 +54,72 @@
                        const LEX_STRING rname, TABLE *table);
                        
 TABLE *
-evex_open_event_table(THD *thd, enum thr_lock_type lock_type);             
+evex_open_event_table(THD *thd, enum thr_lock_type lock_type);
+
+int 
+event_timed_compare_q(void *vptr, byte* a, byte *b);
+
+int
+evex_time_diff(TIME *a, TIME *b);
+
+
+
+#define EXEC_QUEUE_QUEUE_NAME executing_queue
+#define EXEC_QUEUE_DARR_NAME evex_executing_queue
+
+#ifdef EVEX_USE_QUEUE
+ #define EVEX_QUEUE_TYPE QUEUE
+ #define EVEX_PTOQEL byte *
+ #define EVEX_EQ_NAME executing_queue
+
+ #define evex_queue_first_element(queue, __cast) ((__cast)queue_top(queue))
+ #define evex_queue_element(queue, idx, __cast) ((__cast)queue_top(queue))
+ #define evex_queue_delete_element(queue, idx)  queue_remove(queue, idx)
+ #define evex_queue_destroy(queue)              delete_queue(queue)
+ #define evex_queue_first_updated(queue)        queue_replaced(queue)
+ #define evex_queue_insert(queue, element)      queue_insert_safe(queue, element);
+
+#else
+ #define EVEX_QUEUE_TYPE DYNAMIC_ARRAY
+ #define EVEX_PTOQEL gptr
+ #define EVEX_EQ_NAME evex_executing_queue
+
+ #define evex_queue_element(queue, idx, __cast) dynamic_element(queue,idx, __cast)
+ #define evex_queue_delete_element(queue, idx)  delete_dynamic_element(queue, idx);
+ #define evex_queue_destroy(queue)              delete_dynamic(queue)
+/*
+  push_dynamic() expects ptr to the memory to put in, to make things fast
+  so when a pointer has to be put inside a ptr-to-ptr is being passed
+*/
+ #define evex_queue_first_updated(queue)
+ #define evex_queue_insert(queue, element)      VOID(push_dynamic(queue, &element))
+ 
+ 
+#endif
+
+
+void
+evex_queue_init(EVEX_QUEUE_TYPE *queue);
+
+int
+evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element);
+
+void
+evex_queue_sort(EVEX_QUEUE_TYPE *queue);
+
+#define evex_queue_num_elements(queue) queue.elements
+
+
+extern bool evex_is_running;
+extern bool mysql_event_table_exists;
+extern DYNAMIC_ARRAY events_array;
+extern DYNAMIC_ARRAY EXEC_QUEUE_DARR_NAME;
+extern QUEUE EXEC_QUEUE_QUEUE_NAME;
+extern MEM_ROOT evex_mem_root;
+extern pthread_mutex_t LOCK_event_arrays,
+                       LOCK_workers_count,
+                       LOCK_evex_running;
+extern ulonglong evex_main_thread_id;
+
+
 #endif /* _EVENT_PRIV_H_ */

--- 1.8/sql/event_timed.cc	2005-12-08 20:37:48 +01:00
+++ 1.9/sql/event_timed.cc	2005-12-12 21:19:12 +01:00
@@ -14,9 +14,8 @@
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
-#include "mysql_priv.h"
-#include "event.h"
 #include "event_priv.h"
+#include "event.h"
 #include "sp.h"
 
 
@@ -789,7 +788,7 @@
     + strlen("DO ") +
 	+ body.length + strlen(";");
   
-  ret= dst= (char*) alloc_root(thd->mem_root, len);
+  ret= dst= (char*) alloc_root(thd->mem_root, len + 1);
   memcpy(dst, "CREATE EVENT ", tmp_len= strlen("CREATE EVENT "));
   dst+= tmp_len;
   memcpy(dst, dbname.str, tmp_len=dbname.length);
@@ -832,7 +831,7 @@
   *dst= '\0';
 
   *length= len;
-  
+  dst[len]= '\0'; 
   return ret;
 }
 
Thread
bk commit into 5.1 tree (andrey:1.1993)ahristov12 Dec