List:Commits« Previous MessageNext Message »
From:damien Date:November 8 2007 6:22pm
Subject:bk commit into 6.0 tree (dkatz:1.2657)
View as plain text  
Below is the list of changes that have just been committed into a local
6.0 repository of dkatz. When dkatz does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-11-08 12:22:50-05:00, dkatz@stripped +2 -0
  WL441: Changed adding a thd to libevent to use a list instead of a pipe and changed how
threads are killed. Various refactorings.

  sql/scheduler.cc@stripped, 2007-11-08 12:22:44-05:00, dkatz@stripped +188
-131
    Changed adding a thd to libevent to use a list instead of a pipe and changed how
threads are killed. Various refactorings.

  sql/scheduler.h@stripped, 2007-11-08 12:22:44-05:00, dkatz@stripped +1 -8
    Changed adding a thd to libevent to use a list instead of a pipe and changed how
threads are killed. Various refactorings.

diff -Nrup a/sql/scheduler.cc b/sql/scheduler.cc
--- a/sql/scheduler.cc	2007-11-02 15:39:00 -04:00
+++ b/sql/scheduler.cc	2007-11-08 12:22:44 -05:00
@@ -94,32 +94,45 @@ void one_thread_per_connection_scheduler
 #include "event.h"
 
 static uint created_threads, killed_threads;
-static int thd_add_pipe[2]; /*pipe to use for adding connections to libevent*/
-static pthread_mutex_t LOCK_event_loop;
-bool kill_pool_threads= FALSE;
+static bool kill_pool_threads;
+
+static struct event thd_add_event;
+static struct event thd_kill_event;
+
+static pthread_mutex_t LOCK_thd_add;    /* protects thds_need_adding */
+static LIST *thds_need_adding;    /* list of thds to add to libevent queue */
 
-static LIST *thds_need_processing;
+static int thd_add_pipe[2]; /* pipe to signal add connections to libevent*/
+static int thd_kill_pipe[2]; /* pipe for async killing of THDs in libevent */
+
+/*
+  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
+  event_del) and thds_need_processing and thds_need_adding
+*/
+static pthread_mutex_t LOCK_event_loop;
+static LIST *thds_need_processing; /* list of thds that needs some processing */
+static LIST *thds_all_added; /* list of thds with added events */
 
 pthread_handler_t libevent_thread_proc(void *arg);
-static void libevent_abort_threads();
+static void libevent_end();
 static bool libevent_needs_immediate_processing(THD *thd);
 static void libevent_connection_close(THD *thd);
 static bool libevent_should_close_connection(THD* thd);
-
+static void libevent_add_thd(THD* thd);
 void libevent_io_callback(int Fd, short Operation, void *ctx);
 void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
-void libevent_kill_callback(int Fd, short Operation, void *ctx);
+void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
 
 
 /*
-  Create a pipe and set to non-blocking. Returns TRUE if there is an error.
+  Create a pipe and set to non-blocking on write.
+  Returns TRUE if there is an error.
 */
+
 static bool init_pipe(int pipe_fds[])
 {
   int flags;
   return pipe(pipe_fds) < 0 ||
-            (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
-            fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
             (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
             fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
 }
@@ -131,19 +144,14 @@ static bool init_pipe(int pipe_fds[])
 */
 
 thd_scheduler::thd_scheduler()
-  : logged_in(FALSE), io_event(NULL), kill_event(NULL)
+  : logged_in(FALSE), io_event(NULL)
 {
-  kill_pipe[0]= 0;
-  kill_pipe[1]= 0;
 }
 
 
 thd_scheduler::~thd_scheduler()
-{  
-  if (kill_pipe[0]) close(kill_pipe[0]);
-  if (kill_pipe[1]) close(kill_pipe[1]);
+{
   my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
-  my_free(kill_event, MYF(MY_ALLOW_ZERO_PTR));
 }
 
 
@@ -151,30 +159,22 @@ bool thd_scheduler::init(THD *parent_thd
 {
   io_event=
     (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
-  kill_event=
-    (struct event*)my_malloc(sizeof(*kill_event),MYF(MY_ZEROFILL|MY_WME));
     
-  if (!io_event || !kill_event)
+  if (!io_event)
   {
     sql_print_error("Memory allocation error in thd_scheduler::init\n");
     return TRUE;
   }
-
-  if (init_pipe(kill_pipe))
-  {
-    sql_print_error("init_pipe error in thd_scheduler::init\n");
-    return TRUE;
-  }
   
   event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
-    libevent_io_callback, (void*)parent_thd);
-  event_set(kill_event, kill_pipe[0], EV_READ, 
-    libevent_kill_callback, (void*)parent_thd);
-  list_need_processing.data= parent_thd;
+            libevent_io_callback, (void*)parent_thd);
+    
+  list.data= parent_thd;
   
   return FALSE;
 }
 
+
 /*
   Create all threads for the thread pool
 
@@ -187,34 +187,47 @@ bool thd_scheduler::init(THD *parent_thd
     1  We got an error creating the thread pool
        In this case we will abort all created threads
 */
+
 static bool libevent_init(void)
 {
   uint i;
-  static struct event thd_add_event;
   DBUG_ENTER("libevent_init");
 
   event_init();
   
   created_threads= 0;
   killed_threads= 0;
+  kill_pool_threads= FALSE;
   
   pthread_mutex_init(&LOCK_event_loop, NULL);
+  pthread_mutex_init(&LOCK_thd_add, NULL);
   
-    /* set up the pipe used to add new thds to the event pool */
+  /* set up the pipe used to add new thds to the event pool */
   if (init_pipe(thd_add_pipe))
   {
-    sql_print_error("init_pipe error in libevent_init\n");
-    pthread_mutex_unlock(&LOCK_thread_count);
+    sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
+    DBUG_RETURN(1);
+  }
+  /* set up the pipe used to kill thds in the event queue */
+  if (init_pipe(thd_kill_pipe))
+  {
+    sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
+    close(thd_add_pipe[0]);
+    close(thd_add_pipe[1]);
     DBUG_RETURN(1);
   }
   event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
-            libevent_add_thd_callback, NULL);          
+            libevent_add_thd_callback, NULL);
+  event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
+            libevent_kill_thd_callback, NULL);
  
- if (event_add(&thd_add_event, NULL))
+ if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
  {
    sql_print_error("thd_add_event event_add error in libevent_init\n");
+   libevent_end();
+   DBUG_RETURN(1);
+   
  }
-  
   /* Set up the thread pool */
   created_threads= killed_threads= 0;
   pthread_mutex_lock(&LOCK_thread_count);
@@ -224,12 +237,12 @@ static bool libevent_init(void)
     pthread_t thread;
     int error;
     if ((error= pthread_create(&thread, &connection_attrib,
-                              libevent_thread_proc, 0)))
+                               libevent_thread_proc, 0)))
     {
       sql_print_error("Can't create completion port thread (error %d)",
                       error);
       pthread_mutex_unlock(&LOCK_thread_count);
-      libevent_abort_threads();                       // Cleanup
+      libevent_end();                      // Cleanup
       DBUG_RETURN(TRUE);
     }
   }
@@ -254,13 +267,13 @@ static bool libevent_init(void)
     cause the libevent event_loop() to terminate. Then this same thread will
     return from event_loop and pick the thd value back up for processing.
 */
+
 void libevent_io_callback(int, short, void *ctx)
 {    
   safe_mutex_assert_owner(&LOCK_event_loop);
   THD *thd= (THD*)ctx;
-  event_del(thd->scheduler.kill_event);
-  thds_need_processing=
-      list_add(thds_need_processing, &thd->scheduler.list_need_processing);
+  thds_all_added= list_delete(thds_all_added, &thd->scheduler.list);
+  thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
 }
 
 /*
@@ -269,66 +282,89 @@ void libevent_io_callback(int, short, vo
   NOTES
     This is only called by the thread that owns LOCK_event_loop.
 */
-void libevent_kill_callback(int Fd, short, void *ctx)
+
+void libevent_kill_thd_callback(int Fd, short, void*)
 {    
   safe_mutex_assert_owner(&LOCK_event_loop);
-  THD *thd= (THD*)ctx;
-  char c;
-  read(Fd, &c, sizeof(c));
-  event_del(thd->scheduler.io_event);
-  thds_need_processing=
-      list_add(thds_need_processing, &thd->scheduler.list_need_processing);
+  THD *thd;
+  if (read(Fd, &thd, sizeof(thd)) != sizeof(thd))
+  {
+    sql_print_error("read error in libevent_kill_thd_callback\n");
+    return;
+  }
+  
+  pthread_mutex_lock(&LOCK_thd_add);
+  LIST* list= thds_all_added;
+  while (list)
+  {
+    if (list->data == (void*)thd)
+    {
+      /* 
+        Found the THD, we know it's still valid and still being watched by
+        libevent. Delete from libevent and add to the processing queue.
+      */
+      event_del(thd->scheduler.io_event);
+      thds_all_added= list_delete(thds_all_added, &thd->scheduler.list);
+      thds_need_processing= list_add(thds_need_processing,
+                                     &thd->scheduler.list);
+      break;
+    }  
+    list= list_rest(list);
+  }
+  pthread_mutex_unlock(&LOCK_thd_add);
 }
 
 
 /*
-  This is used to add connections to the pool. This callback is invoked from the
-  libevent event_loop() call whenever the thd_add_pipe[1] pipe is written too.
+  This is used to add connections to the pool. This callback is invoked from
+  the libevent event_loop() call whenever the thd_add_pipe[1] pipe is written
+  too.
   
   NOTES
     This is only called by the thread that owns LOCK_event_loop.
 */
+
 void libevent_add_thd_callback(int Fd, short, void *)
 { 
   safe_mutex_assert_owner(&LOCK_event_loop);
   
-  THD *thd;
-  if (read(Fd, &thd, sizeof(THD*)) != sizeof(THD*))
-  {
-    sql_print_error("Error reading from pipe in libevent_add_thd_callback\n");
-    return;
-  }
-  
-  /* thd can be NULL during shutdown, to "wake-up" event_loop() */
-  if (thd == NULL)
-    return;
+  char c;
+  read(Fd, &c, sizeof(c)); /* clears the event */
   
-  if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
-  {
-    /*
-      Add thd to thds_need_processing list. If it needs closing we'll close it
-      outside of event_loop().
-    */
-    thds_need_processing=
-        list_add(thds_need_processing, &thd->scheduler.list_need_processing);
-  }
-  else
+  pthread_mutex_lock(&LOCK_thd_add);
+  while (thds_need_adding)
   {
-    /* add the events to libevent */
-    if (event_add(thd->scheduler.io_event, NULL))
+    /* pop the first thd off the list */
+    THD* thd= (THD*)thds_need_adding->data;
+    thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
+
+    pthread_mutex_unlock(&LOCK_thd_add);
+    
+    if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
     {
-      sql_print_error("io_event event_add error in libevent_add_thd_callback\n");
-      libevent_connection_close(thd);
-      return;
+      /*
+        Add thd to thds_need_processing list. If it needs closing we'll close it
+        outside of event_loop().
+      */
+      thds_need_processing= list_add(thds_need_processing,
+                                     &thd->scheduler.list);
     }
-    if (event_add(thd->scheduler.kill_event, NULL))
+    else
     {
-      sql_print_error("kill_event event_add error in libevent_add_thd_callback\n");
-      event_del(thd->scheduler.io_event);
-      libevent_connection_close(thd);
-      return;
+      /* add to libevent */
+      if (event_add(thd->scheduler.io_event, NULL))
+      {
+        sql_print_error("event_add error in libevent_add_thd_callback\n");
+        libevent_connection_close(thd);
+      } 
+      else
+      {
+        thds_all_added= list_add(thds_all_added, &thd->scheduler.list);
+      }
     }
+    pthread_mutex_lock(&LOCK_thd_add);
   }
+  pthread_mutex_unlock(&LOCK_thd_add);
 }
 
 
@@ -338,49 +374,42 @@ void libevent_add_thd_callback(int Fd, s
   NOTES
     LOCK_thread_count is locked on entry. This function MUST unlock it!
 */
-static void libevent_add_new_connection(THD *thd)
+
+static void libevent_add_connection(THD *thd)
 {
-  DBUG_ENTER("libevent_add_new_connection");
+  DBUG_ENTER("libevent_add_connection");
   DBUG_PRINT("enter", ("thd: 0x%lx  thread_id: %lu",
                        (long) thd, thd->thread_id));
   
   if (thd->scheduler.init(thd))
   {
-      sql_print_error("Scheduler init error in libevent_add_new_connection\n");
-      goto err;
+    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
+    pthread_mutex_unlock(&LOCK_thread_count);
+    libevent_connection_close(thd);
+    DBUG_VOID_RETURN;
   }
-  
   threads.append(thd);
+  libevent_add_thd(thd);
   
-  /* causes the event_loop to invoke libevent_add_thd_callback */
-  if (write(thd_add_pipe[1], &thd, sizeof(THD*)) != sizeof(THD*))
-  {  
-    sql_print_error("Pipe error in libevent_add_new_connection\n");
-    goto err;
-  }
-  
-  pthread_mutex_unlock(&LOCK_thread_count);
-  DBUG_VOID_RETURN;
-  
-err:
   pthread_mutex_unlock(&LOCK_thread_count);
-  libevent_connection_close(thd);
   DBUG_VOID_RETURN;
 }
 
 
-/*
-  Signal a connection it's time to die
-
-  NOTES
-    On entry we have a lock on LOCK_thread_cont
+/**
+  @brief Signal a waiting connection it's time to die.
+ 
+  @details This function will signal libevent that the thread should be killed.
+    Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
+    upon entry.
+ 
+  @param[in]  thd The connection to kill
 */
 
 static void libevent_post_kill_notification(THD *thd)
 {
-  char c= 0;
-  if(write(thd->scheduler.kill_pipe[1], &c, sizeof(c)) != sizeof(c))
-  {  
+  if(write(thd_kill_pipe[1], &thd, sizeof(thd)) != sizeof(thd))
+  {
     sql_print_error("Pipe error in libevent_post_kill_notification\n");
   }
 }
@@ -389,6 +418,7 @@ static void libevent_post_kill_notificat
 /*
   Close and delete a connection.
 */
+
 static void libevent_connection_close(THD *thd)
 {
   DBUG_ENTER("libevent_connection_close");
@@ -402,14 +432,17 @@ static void libevent_connection_close(TH
     close_connection(thd, 0, 1);
   }
   
-  no_threads_end(thd, 0); /* deletes thd */
+  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
+  pthread_mutex_unlock(&LOCK_thread_count);
 
   DBUG_VOID_RETURN;
 }
 
+
 /*
   Returns true if we should close and delete a THD connection.
 */
+
 static bool libevent_should_close_connection(THD* thd)
 {
   return thd->net.error ||
@@ -417,6 +450,12 @@ static bool libevent_should_close_connec
       thd->killed == THD::KILL_CONNECTION;
 }
 
+
+/*
+  libevent_thread_proc is the outer loop of each thread in the thread pool.
+  These procs only return/terminate on shutdown (kill_pool_threads == true).
+*/
+
 pthread_handler_t libevent_thread_proc(void *arg)
 {
   if (init_new_connection_handler_thread())
@@ -456,8 +495,8 @@ pthread_handler_t libevent_thread_proc(v
     
     /* pop the first thd off the list */
     thd= (THD*)thds_need_processing->data;
-    thds_need_processing= 
-        list_delete(thds_need_processing, thds_need_processing);
+    thds_need_processing= list_delete(thds_need_processing,
+                                      thds_need_processing);
     
     (void) pthread_mutex_unlock(&LOCK_event_loop);
     
@@ -520,10 +559,12 @@ thread_exit:
   DBUG_RETURN(0);                               /* purify: deadcode */
 }
 
+
 /*
   Returns TRUE if the connection needs immediate processing and FALSE if 
   instead it's queued for libevent processing or closed,
 */
+
 static bool libevent_needs_immediate_processing(THD *thd)
 {
   if (libevent_should_close_connection(thd))
@@ -540,30 +581,40 @@ static bool libevent_needs_immediate_pro
   if (thd->net.vio == 0 || thd->net.vio->read_pos <
thd->net.vio->read_end)
     return TRUE;
     
-  /*
-    Send to be queued for libevent.
-    The mutex is to protect thd_add_pipe,
-    which can otherwise be simutaneously written-to by multiple threads and
-    from add_connection (which also holds LOCK_thread_count when writing).
-  */  
-  pthread_mutex_lock(&LOCK_thread_count);
-  bool added_to_pipe= write(thd_add_pipe[1], &thd, sizeof(THD*)) == sizeof(THD*);
-  pthread_mutex_unlock(&LOCK_thread_count);
-  if (!added_to_pipe)
-  {
-    /* Things must be really backed up. Close the connection. */
-    sql_print_error("Pipe error in libevent_needs_immediate_processing\n");
-    libevent_connection_close(thd);
-  }
+  libevent_add_thd(thd);
   return FALSE;
 }
 
+
+/*
+  Adds a THD to queued for libevent processing.
+  
+  This call does not actually register the event with libevent.
+  Instead, it places the THD onto a queue and signals libevent by writing
+  a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
+  be invoked which will find the THD on the queue and add it to libevent.
+*/
+
+static void libevent_thd_add(THD* thd)
+{
+  char c=0;
+  pthread_mutex_lock(&LOCK_thd_add);
+  /* queue for libevent */
+  thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
+  thd->mysys_var= NULL;
+  /* notify libevent */
+  write(thd_add_pipe[1], &c, sizeof(c));
+  pthread_mutex_unlock(&LOCK_thd_add);
+}
+
+
 /*
   Wait until all pool threads have been deleted for clean shutdown
 */
-static void libevent_abort_threads()
+
+static void libevent_end()
 {
-  DBUG_ENTER("libevent_abort_threads");
+  DBUG_ENTER("libevent_end");
   DBUG_PRINT("enter", ("created_threads: %d  killed_threads: %u",
                        created_threads, killed_threads));
   
@@ -573,17 +624,23 @@ static void libevent_abort_threads()
   kill_pool_threads= TRUE;
   while (killed_threads != created_threads)
   {
-    /* write a null to wake up the event loop */
-    void *p=NULL;
-    write(thd_add_pipe[1], &p, sizeof(p));
+    /* wake up the event loop */
+    char c= 0;
+    write(thd_add_pipe[1], &c, sizeof(c));
       
     pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
   }
   (void) pthread_mutex_unlock(&LOCK_thread_count);
   
+  event_del(&thd_add_event);
   close(thd_add_pipe[0]);
   close(thd_add_pipe[1]);
+  event_del(&thd_kill_event);
+  close(thd_kill_pipe[0]);
+  close(thd_kill_pipe[1]);
+
   (void) pthread_mutex_destroy(&LOCK_event_loop);
+  (void) pthread_mutex_destroy(&LOCK_thd_add);
   DBUG_VOID_RETURN;
 }
 
@@ -592,9 +649,9 @@ void pool_of_threads_scheduler(scheduler
 {
   func->max_threads= thread_pool_size;
   func->init= libevent_init;
-  func->end=  libevent_abort_threads;
+  func->end=  libevent_end;
   func->post_kill_notification= libevent_post_kill_notification;
-  func->add_connection= libevent_add_new_connection;
+  func->add_connection= libevent_add_connection;
 }
 
 #endif
diff -Nrup a/sql/scheduler.h b/sql/scheduler.h
--- a/sql/scheduler.h	2007-11-01 14:37:48 -04:00
+++ b/sql/scheduler.h	2007-11-08 12:22:44 -05:00
@@ -48,11 +48,6 @@ enum scheduler_types
 void one_thread_per_connection_scheduler(scheduler_functions* func);
 void one_thread_scheduler(scheduler_functions* func);
 
-enum pool_command_op
-{
-  NOT_IN_USE_OP= 0, NORMAL_OP= 1, CONNECT_OP, KILL_OP, DIE_OP
-};
-
 #if defined(HAVE_LIBEVENT) && !defined(EMBEDDED_LIBRARY)
 
 #define HAVE_POOL_OF_THREADS 1
@@ -64,9 +59,7 @@ class thd_scheduler
 public:
   bool logged_in;
   struct event* io_event;
-  struct event* kill_event;
-  int kill_pipe[2];
-  LIST list_need_processing;
+  LIST list;
 
   thd_scheduler();
   ~thd_scheduler();
Thread
bk commit into 6.0 tree (dkatz:1.2657)damien8 Nov