List:Commits« Previous MessageNext Message »
From:damien Date:October 9 2007 10:46pm
Subject:bk commit into 5.2 tree (dkatz:1.2610)
View as plain text  
Below is the list of changes that have just been committed into a local
5.2 repository of dkatz. When dkatz does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-10-09 16:46:06-04:00, dkatz@stripped +2 -0
  wl#441: For design review. Threadpooling/completion port support using libevent.

  sql/scheduler.cc@stripped, 2007-10-09 16:46:04-04:00, dkatz@stripped +395
-0
    wl#441: For design review. Threadpooling/completion port support using libevent.

  sql/scheduler.h@stripped, 2007-10-09 16:46:04-04:00, dkatz@stripped +27 -0
    wl#441: For design review. Threadpooling/completion port support using libevent.

diff -Nrup a/sql/scheduler.cc b/sql/scheduler.cc
--- a/sql/scheduler.cc	2007-02-23 06:13:52 -05:00
+++ b/sql/scheduler.cc	2007-10-09 16:46:04 -04:00
@@ -23,6 +23,7 @@
 
 #include <mysql_priv.h>
 
+
 /*
   'Dummy' functions to be used when we don't need any handling for a scheduler
   event
@@ -86,3 +87,397 @@ void one_thread_per_connection_scheduler
   func->end_thread= one_thread_per_connection_end;
 }
 #endif /* EMBEDDED_LIBRARY */
+
+
+#if defined(HAVE_LIBEVENT) && HAVE_POOL_OF_THREADS == 1
+
+#include "event.h"
+
+static uint created_threads, killed_threads;
+static THD* thd_to_add = NULL;
+static THD* thd_to_readd = NULL;
+static THD* event_thd = NULL;
+
+static pthread_mutex_t LOCK_event_loop;
+static pthread_cond_t COND_event_loop;
+
+bool kill_pool_threads = false;
+
+pthread_handler_t libevent_thread_proc(void *arg);
+static void libevent_abort_threads();
+
+void libevent_io_callback(int Fd, short Operation, void* ctx);
+void libevent_signal_callback(int Fd, short Operation, void* ctx);
+
+extern volatile sig_atomic_t event_gotsig; // provided by libevent
+
+/*
+  thd_scheduler keeps the connection between THD and event.
+  It's embedded in the THD class.
+*/
+
+thd_scheduler::thd_scheduler()
+  : waiting(false), logged_in(false)
+{
+  pthread_mutex_init(&lock,MY_MUTEX_INIT_FAST);
+  event = (struct event*)my_malloc(sizeof(struct event), MYF(MY_WME));
+  memset(event, 0, sizeof(*event));
+}
+
+
+thd_scheduler::~thd_scheduler()
+{
+  pthread_mutex_destroy(&lock);
+  my_free(event, MYF(0));
+}
+
+/*
+  Create all threads for the thread pool
+
+  NOTES
+    After threads are created we wait until all threads has signaled that
+    they have started before we return
+
+  RETURN
+    0  ok
+    1  We got an error creating the thread pool
+       In this case we will abort all created threads
+*/
+
+static bool libevent_init(void)
+{
+  uint i;
+  static struct event signal_event;
+  DBUG_ENTER("libevent_init");
+
+  event_init();
+  
+  event_thd = NULL;
+  created_threads= 0;
+  killed_threads= 0;
+  
+  pthread_mutex_init(&LOCK_event_loop, NULL);
+  pthread_cond_init(&COND_event_loop, NULL);
+  
+  event_set(&signal_event, SIGUSR1, EV_SIGNAL|EV_PERSIST,
+            libevent_signal_callback, NULL);
+  
+  /* Set up the thread pool */
+  created_threads= killed_threads= 0;
+  pthread_mutex_lock(&LOCK_thread_count);
+
+  for (i= 0; i < thread_pool_size; i++)
+  {
+    pthread_t thread;
+    int error= pthread_create(&thread, &connection_attrib,
+                              libevent_thread_proc, 0);
+    if (error)
+    {
+      pthread_mutex_unlock(&LOCK_thread_count);
+      sql_print_error("Can't create completion port thread (error %d)",
+                      error);
+      libevent_abort_threads();                       // Cleanup
+      DBUG_RETURN(TRUE);
+    }
+  }
+
+  /* Wait until all threads are created */
+  while (created_threads != thread_pool_size)
+    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
+  pthread_mutex_unlock(&LOCK_thread_count);
+  DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
+  DBUG_RETURN(FALSE);
+}
+
+/*
+ This is called whenever data is ready on the fd.
+ What we do is set a global variable to the thd that got the data, and 
+ cause the event_loop to terminate. Then this same thread will return from
+ event_loop and pick the thd value back up for processing.
+*/
+void libevent_io_callback(int Fd, short Operation, void* ctx)
+{
+  event_thd= (THD*)ctx;
+  event_gotsig= 1; /* causes event_loop to terminate. */
+}
+
+/*
+  This is invoked on event_loop whenever our raise(sig) is called.
+  
+  This is used to add new connections to the pool.
+*/
+void libevent_add_thd_callback(int Fd, short Operation, void* /* ctx */)
+{
+  
+  pthread_mutex_lock(&LOCK_thread_count);
+  
+  THD* thd_add= thd_to_add;
+  THD* thd_readd= thd_to_readd;
+  
+  thd_to_add= thd_to_add= NULL;
+  
+  pthread_mutex_unlock(&LOCK_thread_count);
+  
+  if (thd_add || thd_readd)
+  {
+    /* signal we got the connection(s) */
+    pthread_cond_broadcast(&COND_thread_count);
+  }
+  
+  if (thd_add)
+    event_add(thd_add->scheduler.event, NULL);
+  
+  if (thd_readd)
+    event_add(thd_readd->scheduler.event, NULL);
+   
+  if (kill_pool_threads)
+    event_gotsig= 1;                  /* causes event_loop to terminate. */
+}
+
+
+/*
+  Notify the thread pool about a new connection
+
+  NOTES
+    LOCK_thread_count is locked on entry. This function MUST unlock it!
+*/
+
+static void libevent_add_connection(THD *thd)
+{
+  DBUG_ENTER("libevent_add_connection");
+  DBUG_PRINT("enter", ("thd: 0x%lx  thread_id: %lu",
+                       (long) thd, thd->thread_id));
+
+  threads.append(thd);
+
+  event_set(thd->scheduler.event, thd->net.vio->sd, EV_READ, 
+            libevent_io_callback, (void*)thd);
+  
+  while (thd_to_add)
+    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
+  
+  thd_to_add= thd;
+  
+  /* causes the event_loop to invoke libevent_add_thd_callback */
+  raise(SIGUSR1);
+  
+  pthread_mutex_unlock(&LOCK_thread_count);
+  
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+  Signal a connection it's time to die
+
+  NOTES
+    On entry we have a lock on LOCK_thread_cont
+*/
+
+static void libevent_post_kill_notification(THD *thd)
+{
+  (void) pthread_mutex_lock(&thd->scheduler.lock);
+  if (thd->scheduler.waiting)
+  {
+    /*
+      thd is owned by libevent or it just came out it.
+      Close connection here and cause it to come out of
+      libevent (if its there), then the pool thread will
+      delete thd.
+    */
+    end_connection(thd);
+    close_connection(thd, ER_FORCING_CLOSE, 1);
+  }
+  (void) pthread_mutex_unlock(&thd->scheduler.lock);
+}
+
+
+
+static void libevent_connection_close(THD *thd)
+{
+  DBUG_ENTER("libevent_connection_close");
+  DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
+
+  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
+
+  if (thd->net.vio->sd >= 0)                   // not already closed
+  {
+    end_connection(thd);
+    close_connection(thd, 0, 1);
+  }
+  
+  no_threads_end(thd, 0); /* deletes thd */
+
+  DBUG_VOID_RETURN;
+}
+
+
+pthread_handler_t libevent_thread_proc(void *arg)
+{
+  if (init_new_connection_handler_thread())
+  {
+    my_thread_global_end();
+    sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
+    exit(1);
+  }
+  DBUG_ENTER("libevent_thread_proc");
+
+  /*
+    Signal libevent_init() when all threads has been created and are ready to
+    receive events.
+  */
+  (void) pthread_mutex_lock(&LOCK_thread_count);
+  created_threads++;
+  if (created_threads == thread_pool_size)
+    (void) pthread_cond_signal(&COND_thread_count);
+  (void) pthread_mutex_unlock(&LOCK_thread_count);
+  
+  while (true)
+  {  
+    static bool event_loop_locked = false;
+    THD* thd;
+    (void) pthread_mutex_lock(&LOCK_event_loop);
+    
+    while (event_loop_locked)
+      pthread_cond_wait(&COND_event_loop, &LOCK_event_loop);
+    
+    if (kill_pool_threads)
+    {
+      /* the flag that we should die has been set */
+      (void) pthread_mutex_unlock(&LOCK_event_loop);
+      (void) pthread_cond_signal(&COND_event_loop); 
+      break;
+    }  
+    
+    event_loop_locked = true;
+    
+    event_loop(EVLOOP_ONCE);
+    
+    thd = event_thd;
+    event_thd = NULL;
+    
+    event_loop_locked = false;
+    
+    (void) pthread_mutex_unlock(&LOCK_event_loop);
+    (void) pthread_cond_signal(&COND_event_loop); /* wake up another thread to
+                                                    process the event loop */
+    
+    if (thd == NULL)
+      continue;       /* nothing to do, loop again */
+    
+    (void) pthread_mutex_lock(&thd->scheduler.lock);
+    thd->scheduler.waiting = false;
+    (void) pthread_mutex_unlock(&thd->scheduler.lock);
+    
+    thd->thread_stack= (char*) &thd;
+
+    if (thd->killed == THD::KILL_CONNECTION ||
+        setup_connection_thread_globals(thd))
+    {
+      libevent_connection_close(thd);
+      continue;
+    }
+
+    if (!thd->scheduler.logged_in)
+    {
+      DBUG_PRINT("info", ("init new connection.  sd: %d",
+                          thd->net.vio->sd));
+      if (login_connection(thd))
+      {
+        /* Failed to log in */
+        libevent_connection_close(thd);
+        continue;
+      }
+      else
+      {
+        /* login successful */
+        thd->scheduler.logged_in = true;
+        prepare_new_connection_state(thd);
+        goto maybe_process_command;
+      }
+    }
+
+    while (true)
+    {
+      /* Process one query */
+      thd->net.no_send_error= 0;
+      if (do_command(thd))
+      {
+        libevent_connection_close(thd);
+        break;
+      }
+
+maybe_process_command:
+      if (thd->net.vio->read_pos < thd->net.vio->read_end)
+      {
+        /*
+          More data in the pipe, process another command.
+
+          Note: we cannot call event_add because the whole
+          request might already be buffered and we wouldn't receive an
+          event.
+        */
+        continue;
+      }
+      /* set up to wait for more data */
+
+      pthread_mutex_lock(&LOCK_thread_count);
+      
+      while (thd_to_readd)  /* a previous request is already waiting, */
+        pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
+
+      thd_to_readd = thd;
+
+      pthread_mutex_unlock(&LOCK_thread_count);
+      raise(SIGUSR1);
+      break;
+    }
+  }
+
+  DBUG_PRINT("exit", ("ending thread"));
+  (void) pthread_mutex_lock(&LOCK_thread_count);
+  killed_threads++;
+  pthread_cond_broadcast(&COND_thread_count);
+  (void) pthread_mutex_unlock(&LOCK_thread_count);
+  my_thread_end();
+  pthread_exit(0);
+  DBUG_RETURN(0);                               /* purify: deadcode */
+}
+
+
+/*
+  Wait until all pool threads has been deleted for clean shutdown
+*/
+
+static void libevent_abort_threads()
+{
+  DBUG_ENTER("libevent_abort_threads");
+  DBUG_PRINT("enter", ("created_threads: %d  killed_threads: %u",
+                       created_threads, killed_threads));
+  
+  
+  (void) pthread_mutex_lock(&LOCK_thread_count);
+  kill_pool_threads = true;
+  while (killed_threads != created_threads)
+  {
+    raise(SIGUSR1);
+    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
+  }
+  (void) pthread_mutex_unlock(&LOCK_thread_count);
+  
+  (void) pthread_mutex_destroy(&LOCK_event_loop);
+  (void) pthread_cond_destroy(&COND_event_loop);
+  DBUG_VOID_RETURN;
+}
+
+
+void pool_of_threads_scheduler(scheduler_functions* func)
+{
+  func->max_threads= thread_pool_size;
+  func->init= libevent_init;
+  func->end=  libevent_abort_threads;
+  func->post_kill_notification= libevent_post_kill_notification;
+  func->add_connection= libevent_add_connection;
+}
+
+#endif
diff -Nrup a/sql/scheduler.h b/sql/scheduler.h
--- a/sql/scheduler.h	2007-02-23 06:13:52 -05:00
+++ b/sql/scheduler.h	2007-10-09 16:46:04 -04:00
@@ -53,8 +53,35 @@ enum pool_command_op
   NOT_IN_USE_OP= 0, NORMAL_OP= 1, CONNECT_OP, KILL_OP, DIE_OP
 };
 
+#define HAVE_LIBEVENT
+
+#ifdef HAVE_LIBEVENT
+
+//forward declare
+struct event;
+
+#define HAVE_POOL_OF_THREADS 1
+
+class thd_scheduler
+{
+public:
+  bool waiting;
+  bool logged_in;
+  pthread_mutex_t lock;
+  struct event* event;
+
+  thd_scheduler();
+  ~thd_scheduler();
+};
+
+
+void pool_of_threads_scheduler(scheduler_functions* func);
+
+#else
+
 #define HAVE_POOL_OF_THREADS 0                  /* For easyer tests */
 #define pool_of_threads_scheduler(A) one_thread_per_connection_scheduler(A)
 
 class thd_scheduler
 {};
+#endif /* HAVE_POOL_OF_THREADS */
Thread
bk commit into 5.2 tree (dkatz:1.2610)damien9 Oct