List:Commits« Previous MessageNext Message »
From:Mikael Ronstrom Date:January 22 2010 10:14am
Subject:bzr commit into mysql-5.5-next-mr branch (mikael:3000)
View as plain text  
#At file:///home/mikael/mysql_clones/trans_wl5136/ based on revid:mikael@stripped

 3000 Mikael Ronstrom	2010-01-22
      Moved to priority queueing with started transactions having prio over non-started transactions

    modified:
      sql/scheduler_epoll.cc
      sql/scheduler_poll.cc
      sql/scheduler_thread_pool.cc
      sql/scheduler_thread_pool.h
=== modified file 'sql/scheduler_epoll.cc'
--- a/sql/scheduler_epoll.cc	2010-01-20 15:19:39 +0000
+++ b/sql/scheduler_epoll.cc	2010-01-22 10:14:49 +0000
@@ -150,7 +150,7 @@ static int tp_group_low_level_wait_for_e
     if (tp_shutdown != 0)
       break;
 
-    if (events_total == 1)
+    if (events_total == 1 && !is_query_available(my_tp_group))
     {
       /*
         Single thread can handle work load...
@@ -160,13 +160,13 @@ static int tp_group_low_level_wait_for_e
                 (tp_client_low_level_t)event_list[0].data.ptr;
 
       /* Process the event */
-      tp_process_event(my_thread_data, FALSE);
+      tp_process_event(my_thread_data);
       my_thread_data->client_low_level_cntx= NULL;
       events_total= 0;
     }
     else
     {
-      if (events_total > 1)
+      if (events_total > 0)
       {
         /* Put all clients that are ready into the "ready" list */
         for (int loop= 0; loop < events_total; loop++)

=== modified file 'sql/scheduler_poll.cc'
--- a/sql/scheduler_poll.cc	2010-01-20 22:52:49 +0000
+++ b/sql/scheduler_poll.cc	2010-01-22 10:14:49 +0000
@@ -392,7 +392,7 @@ static int tp_group_low_level_wait_for_e
     if (events_total <= 0)
       continue;
 
-    if (events_total == 1)
+    if (events_total == 1 && !is_query_available(my_tp_group))
     {
       /*
         Single thread can handle work load...
@@ -416,7 +416,7 @@ static int tp_group_low_level_wait_for_e
                   tp_group_low_level_cntx->low_level_cntx[client_index];
 
             /* Process the event */
-            tp_process_event(my_thread_data, FALSE);
+            tp_process_event(my_thread_data);
             my_thread_data->client_low_level_cntx= NULL;
             events_total= 0;
             break;
@@ -427,7 +427,7 @@ static int tp_group_low_level_wait_for_e
     }
     else
     {
-      if (events_total > 1)
+      if (events_total > 0)
       {
         /* Put all clients that are ready into the "ready" list */
         for (client_count= 0; client_count < clients_valid; client_count++)

=== modified file 'sql/scheduler_thread_pool.cc'
--- a/sql/scheduler_thread_pool.cc	2010-01-20 22:59:40 +0000
+++ b/sql/scheduler_thread_pool.cc	2010-01-22 10:14:49 +0000
@@ -59,6 +59,13 @@ typedef struct _tp_group_low_level_s *tp
  /* thread pool client low level */
 typedef struct _tp_client_low_level_s *tp_client_low_level_t;
 
+enum tp_start_type
+{
+  TP_NO_START_REQUIRED = 0,
+  TP_MUST_START_ONE = 1,
+  TP_MUST_START = 2
+};
+
 enum wake_level
 {
   WAKE_IF_CONSUMER = 0,
@@ -77,31 +84,24 @@ typedef struct _tp_thread_s
 typedef struct _tp_group_events_s
 {
   tp_client_low_level_t event_ready_array[MAX_EVENTS_PER_WAIT_CALL];
-  int                   event_max;
-  int                   event_total;
-  int                   event_current;
 } tp_group_events_t;
 
 typedef struct _tp_group_s
 {
   mysql_mutex_t         LOCK_group;
-  mysql_cond_t          COND_producer;
   mysql_cond_t          COND_consumer;
   mysql_cond_t          COND_reserve;
   my_atomic_rwlock_t    LOCK_threads_active;
 
   int                   group_idx;
   tp_group_low_level_t  group_low_level_cntx;
-  tp_group_events_t     tp_events[MAX_EVENT_GROUPS];
   tp_client_low_level_t first_queued_query;
   tp_client_low_level_t last_queued_query;
-  tp_client_low_level_t first_ready_query;
-  tp_client_low_level_t last_ready_query;
+  tp_client_low_level_t first_queued_trans;
+  tp_client_low_level_t last_queued_trans;
+  int                   queued_events;
+  int                   trans_events;
   uint64                check_count;
-  int                   num_active_transactions;
-  int                   max_active_transactions;
-  int                   events_index_produce;
-  int                   events_index_consume;
   int                   threads_user_request;
   int                   max_active_threads;
   int                   threads_in_group;
@@ -124,7 +124,8 @@ static int tp_group_low_level_wait_for_e
         tp_group_low_level_t tp_group_low_level_cntx,
         tp_group_t *my_tp_group,
         tp_thread_t *my_thread_data,
-        tp_group_events_t *next_events);
+        tp_group_events_t *next_events,
+        bool return_with_events);
 static int tp_group_low_level_waiter_wake(tp_group_t *my_tp_group);
 static tp_group_t *tp_group_low_level_get_tp_group(
         tp_group_low_level_t tp_group_low_level_cntx);
@@ -155,22 +156,12 @@ static int tp_client_low_level_end(
 
 /*********************************************************************/
 /******* Transaction queueing interface *******/
-static void check_ready_query(tp_group_t *my_tp_group);
+static void check_trans_queue(tp_group_t *my_tp_group);
 static void insert_queued_query(tp_group_t *my_tp_group,
                                 tp_client_low_level_t client_cntx);
-static tp_client_low_level_t get_oldest_queued_query(tp_group_t *my_tp_group);
-static void check_execute_queued_query(tp_group_t *my_tp_group,
-                                       tp_thread_t *my_thread_data);
-static void execute_ready_query(tp_group_t *my_tp_group,
-                                tp_thread_t *my_thread_data,
-                                my_bool exec_queued);
-static int check_active_transactions_before(tp_group_t *my_tp_group,
-                                            tp_thread_t *my_thread_data,
-                                            my_bool must_start);
-static int check_active_transactions_after(tp_group_t *my_tp_group,
-                                           tp_thread_t *my_thread_data,
-                                           THD *thd,
-                                           my_bool must_start);
+static void insert_queued_trans(tp_group_t *my_tp_group,
+                                tp_client_low_level_t client_cntx);
+static tp_client_low_level_t get_highest_priority_query(tp_group_t *my_tp_group);
 
 /*********************************************************************/
 /******* GENERAL *******/
@@ -197,7 +188,6 @@ static ulonglong      NANOSEC_PER_MILLI=
 
 #ifdef HAVE_PSI_INTERFACE
 static PSI_mutex_key         key_LOCK_group;
-static PSI_cond_key          key_COND_producer;
 static PSI_cond_key          key_COND_consumer;
 static PSI_cond_key          key_COND_reserve;
 
@@ -215,7 +205,6 @@ static PSI_mutex_info all_tp_mutexes[]=
 
 static PSI_cond_info all_tp_conds[]=
 {
-  { &key_COND_producer, "tp_group_t::COND_producer", 0},
   { &key_COND_consumer, "tp_group_t::COND_consumer", 0},
   { &key_COND_reserve, "tp_group_t::COND_reserve", 0},
   { &key_tp_COND_stall_check, "tp_LOCK_stall_check", PSI_FLAG_GLOBAL},
@@ -267,6 +256,45 @@ pthread_key(PSI_thread*, THR_PSI_backup)
 #define usleep(a) Sleep(max(1, a/1000))
 #endif
 
+/*********************************************************************/
+static int tp_create_worker_threads(tp_group_t *cur_group, int new_threads)
+{
+  int new_count= 0;
+  int thread_loop, error;
+  tp_thread_t *cur_thread= NULL;
+
+  /* Start threads to deal with a group of sockets/clients */
+  for (thread_loop= cur_group->threads_in_group;
+        (thread_loop < MAX_THREADS_PER_GROUP) &&
+        (new_count < new_threads);
+        thread_loop++)
+  {
+    cur_thread= &(cur_group->group_threads[thread_loop]);
+
+    if (cur_thread->thread_id == 0)
+    {
+      cur_thread->thread_tp_group= cur_group->group_idx;
+      cur_thread->client_low_level_cntx= NULL;
+      error= mysql_thread_create(key_tp_worker_thread,
+              &(cur_thread->thread_id),
+              &connection_attrib,
+              tp_worker_thread_main, cur_thread);
+      if (error)
+      {
+        sql_print_error("Can't create pool thread (error %d, errno: %d)",
+                error, socket_errno);
+        break;
+      }
+      else
+      {
+        cur_group->threads_in_group++;
+        new_count++;
+      }
+    }
+  }
+  return new_count;
+}
+
 static int
 get_active_threads(tp_group_t *tp_group)
 {
@@ -392,32 +420,20 @@ set_tp_psi_env()
 #define set_user_psi_env(thd)
 #define set_tp_psi_env()
 #endif
+
 /*********************************************************************/
-static inline void tp_process_event(tp_thread_t *my_thread_data,
-                                    my_bool must_start)
+static void tp_process_event(tp_thread_t *my_thread_data)
 {
   THD* thd;
   tp_group_t *my_tp_group;
-  my_bool success= FALSE;
+  my_bool is_active= FALSE;
+  int execute_again;
 
   DBUG_ENTER("tp_process_event");
   my_tp_group= &(tp_group_list[my_thread_data->thread_tp_group]);
 
-  /*
-    Check if the connection is actively executing a transaction already.
-    In this case we allow it to proceed without check on transactions.
-    If the connection hasn't started a transaction yet, then we perform
-    checks to see if it is desirable to put the thread on a queue instead
-    to execute later.
-  */
-  if (check_active_transactions_before(my_tp_group,
-                                       my_thread_data,
-                                       must_start))
-    return;
   add_active_threads(my_tp_group, +1);
 
-execute_one_query:
-
   thd= tp_client_low_level_get_thd(my_thread_data->client_low_level_cntx);
   my_thread_data->process_count= 0;
   tp_client_low_level_set_processing_thread(
@@ -435,7 +451,7 @@ execute_one_query:
 
   set_user_psi_env(thd); /* Set perfschema accounting to user thread */
   /* Calls close_connection() on failure */
-  if (setup_connection_thread_globals(thd))
+  if (unlikely(setup_connection_thread_globals(thd)))
   {
     tp_full_cleanup(my_thread_data, 0);
     goto end;
@@ -452,13 +468,13 @@ execute_one_query:
   {
     Vio *vio;
     thd->net.reading_or_writing= 0;
-    if (do_command(thd) || check_failed_connection(thd))
+    if (unlikely(do_command(thd) || check_failed_connection(thd)))
     {
       tp_full_cleanup(my_thread_data, 0);
       goto end;
     }
     vio= thd->net.vio;
-    if (!vio->has_data(vio))
+    if (likely(!vio->has_data(vio)))
       break;
     /*
       There is cached data already read off of the socket,
@@ -468,68 +484,32 @@ execute_one_query:
     */
   } while (1);
   my_thread_data->process_count= PROCESS_COUNT_INIT;
+
   /* Indicate we're reading again (waiting for events) */
   thd->net.reading_or_writing= 1;
 
-  if (tp_client_low_level_rearm(my_thread_data->client_low_level_cntx) != 0)
+  /*
+    This is the limit at which we can no longer use the thd object.
+    As soon as we rearm the socket, another thread can pick up a 
+    query on the socket and start executing it.
+    
+    Since rearming it means making a syscall, we should be safe that
+    a memory barrier occurs before the next thread picks up the thd
+    object and thus see our changes to it.
+  */
+  if (unlikely(tp_client_low_level_rearm(my_thread_data->client_low_level_cntx)
+               != 0))
   {
     tp_full_cleanup(my_thread_data, ER_ERROR_ON_READ);
     goto end;
   }
-  success= TRUE;
 end:
   set_tp_psi_env(); /* Set perfschema accounting to thread pool */
-  if (!success)
-    thd= NULL;
-  if (check_active_transactions_after(my_tp_group,
-                                      my_thread_data,
-                                      thd,
-                                      must_start))
-    goto execute_one_query;
   add_active_threads(my_tp_group, -1);
   DBUG_VOID_RETURN;
 }
 
 /*********************************************************************/
-static int tp_create_worker_threads(tp_group_t *cur_group, int new_threads)
-{
-  int new_count= 0;
-  int thread_loop, error;
-  tp_thread_t *cur_thread= NULL;
-
-  /* Start threads to deal with a group of sockets/clients */
-  for (thread_loop= cur_group->threads_in_group;
-        (thread_loop < MAX_THREADS_PER_GROUP) &&
-        (new_count < new_threads);
-        thread_loop++)
-  {
-    cur_thread= &(cur_group->group_threads[thread_loop]);
-
-    if (cur_thread->thread_id == 0)
-    {
-      cur_thread->thread_tp_group= cur_group->group_idx;
-      cur_thread->client_low_level_cntx= NULL;
-      error= mysql_thread_create(key_tp_worker_thread,
-              &(cur_thread->thread_id),
-              &connection_attrib,
-              tp_worker_thread_main, cur_thread);
-      if (error)
-      {
-        sql_print_error("Can't create pool thread (error %d, errno: %d)",
-                error, socket_errno);
-        break;
-      }
-      else
-      {
-        cur_group->threads_in_group++;
-        new_count++;
-      }
-    }
-  }
-  return new_count;
-}
-
-/*********************************************************************/
 static int tp_wake_thread(tp_group_t *my_tp_group,
                           wake_level level)
 {
@@ -570,20 +550,105 @@ static int tp_wake_thread(tp_group_t *my
   return new_count;
 }
 
-/*********************************************************************/
-static void tp_worker_thread_finish(void *arg)
+static bool
+is_transaction_active(tp_client_low_level_t client_cntx)
 {
-  tp_group_t *my_tp_group;
-  my_tp_group= (tp_group_t *) arg;
+  THD *thd= tp_client_low_level_get_thd(client_cntx);
+  return thd->transaction.is_active();
+}
+
+static void
+queue_query(tp_group_t *my_tp_group,
+            tp_client_low_level_t client_cntx)
+{
+  if (is_transaction_active(client_cntx))
+    insert_queued_query(my_tp_group, client_cntx);
+  else
+    insert_queued_trans(my_tp_group, client_cntx);
+}
+
+static void
+handle_waiting_thread(tp_group_t *my_tp_group,
+                      tp_thread_t *my_thread_data)
+{
+  tp_group_events_t next_events;
+  int next_index;
+  int events_ready;
+
+  /* This thread will be the waiting thread. */
+  my_tp_group->waiting_thread= my_thread_data;
 
-  my_tp_group->threads_initialized--;
+  /*
+    my_tp_group->waiting_thread is the "lock state" that protects
+    my_tp_group->events_index_produce from having multiple threads
+    accesing it. Thus we don't need to hold a pthread lock any longer.
+    Unlock the group before I wait for new tp_events
+  */
   mysql_mutex_unlock(&my_tp_group->LOCK_group);
-  mysql_cond_broadcast(&my_tp_group->COND_producer);
-  mysql_cond_broadcast(&my_tp_group->COND_consumer);
-  mysql_cond_broadcast(&my_tp_group->COND_reserve);
-  my_thread_end();
+
+  /*
+    Wait for file descriptors to have something of interest
+    NOTE: This call may process an event if a single fd has IO ready
+    this may deadlock if we don't have correct "going to sleep"
+    callbacks, this optimisation is for situations with only a single
+    connection actively pursuing queries. If there is a query
+    available when we arrive here it isn't likely to be this situation
+    we're in.
+  */
+  events_ready= tp_group_low_level_wait_for_events(
+              my_tp_group->group_low_level_cntx,
+              my_tp_group,
+              my_thread_data,
+              &next_events,
+              is_query_available);
+
+  /* Lock the group so I can figure out what to do next */
+  mysql_mutex_lock(&my_tp_group->LOCK_group);
+
+  if (my_tp_group->waiting_thread == my_thread_data)
+  {
+    /* I'm no longer the waiting thread */
+    my_tp_group->waiting_thread= NULL;
+  }
+
+  for (i= 0; i < events_ready; i++)
+    queue_query(my_tp_group, next_events.events_array[i]);
+}
+
+static int
+is_credit_available(tp_group_t *my_tp_group)
+{
+  return (get_active_threads(my_tp_group) <
+          my_tp_group->max_active_threads);
+}
+
+static int
+is_query_available(tp_group_t *my_tp_group)
+{
+  if (my_tp_group->first_queued_query ||
+      my_tp_group->first_queued_trans)
+    return TRUE;
+  return FALSE;
 }
 
+static void
+process_query(tp_group_t *my_tp_group,
+              tp_thread_t *my_thread_data,
+              tp_group_events_t *cur_events)
+{
+  my_thread_data->client_low_level_cntx=
+    get_highest_priority_query(my_tp_group);
+ 
+  /* Unlock the group before I process an event */
+  mysql_mutex_unlock(&my_tp_group->LOCK_group);
+
+  /* Process the event */
+  tp_process_event(my_thread_data);
+  my_thread_data->client_low_level_cntx= NULL;
+
+  /* Lock the group so I can update my completion status */
+  mysql_mutex_lock(&my_tp_group->LOCK_group);
+}
 
 /* Main function for all pool threads */
 /*********************************************************************/
@@ -591,7 +656,6 @@ pthread_handler_t tp_worker_thread_main(
 {
   tp_thread_t *my_thread_data;
   tp_group_t *my_tp_group;
-  tp_group_events_t *cur_events;
 
   DBUG_ENTER("tp_worker_thread_main");
 
@@ -623,213 +687,77 @@ pthread_handler_t tp_worker_thread_main(
   /* START MAIN FOREVER LOOP */
   while (tp_shutdown == 0)
   {
-
-    /* Make sure we have a thread waiting for new tp_events */
-    if (my_tp_group->waiting_thread == NULL)
-    {
-      tp_group_events_t *next_events;
-      int next_index;
-      int events_ready;
-
-      /* This thread will be the waiting thread. */
-      my_tp_group->waiting_thread= my_thread_data;
-
-      next_index= my_tp_group->events_index_produce + 1;
-      next_index&= MAX_EVENT_GROUP_MASK;
-
-      /*
-        Wait for consumers to catch up so I don't use an event group
-        that already has data in it
-      */
-      while (
-          /* Next event group has events to process */
-          (next_index == my_tp_group->events_index_consume) &&
-          /* I'm not supposed to be shutting down */
-          (tp_shutdown == 0))
-      {
-        /* Get another thread to help process this list of tp_events */
-        tp_wake_thread(my_tp_group, WAKE_OR_CREATE_ONE);
-        (void) mysql_cond_wait(&my_tp_group->COND_producer,
-                &my_tp_group->LOCK_group);
-      }
-
-      /* Wake another thread to help process this list of tp_events */
-      tp_wake_thread(my_tp_group, WAKE_IF_CONSUMER);
-
-      /*
-        my_tp_group->waiting_thread is the "lock state" that protects
-        my_tp_group->events_index_produce from having multiple threads
-        accesing it. Thus we don't need to hold a pthread lock any longer.
-        Unlock the group before I wait for new tp_events
-      */
-      mysql_mutex_unlock(&my_tp_group->LOCK_group);
-
-      /* Now I have an available event group */
-      next_events=
-            &(my_tp_group->tp_events[my_tp_group->events_index_produce]);
-      /* Erase it in preparation of being used */
-      next_events->event_total= 0;
-      next_events->event_current= 0;
-
-      /*
-        Wait for file descriptors to have something of interest
-        NOTE: This call may process an event if a single fd has IO ready
-        this may deadlock if we don't have correct "going to sleep"
-        callbacks
-      */
-      events_ready= tp_group_low_level_wait_for_events(
-                  my_tp_group->group_low_level_cntx,
-                  my_tp_group,
-                  my_thread_data,
-                  next_events);
-
-      /* Lock the group so I can figure out what to do next */
-      mysql_mutex_lock(&my_tp_group->LOCK_group);
-
-      if (my_tp_group->waiting_thread == my_thread_data)
-      {
-        /* I'm no longer the waiting thread */
-        my_tp_group->waiting_thread= NULL;
-      }
-
-      if (events_ready > 0)
-      {
-        /* Produce events for processing */
-        next_events->event_total= events_ready;
-        next_events->event_current= 0;
-        /* Move to the next event group for producing tp_events */
-        my_tp_group->events_index_produce= next_index;
-      }
-
-    }
-    check_execute_queued_query(my_tp_group, my_thread_data);
-
-    /* Get the current event group in case it changed when I was busy */
-    cur_events=
-          &(my_tp_group->tp_events[my_tp_group->events_index_consume]);
-
     /*
       Nothing to do and enough threads waiting to consume, so lets get into
       the reserve group that will RARELY need scheduled.
       This is an optimization to avoid too many context switches when the
       pool group has MANY threads and FEW clients.
+
+      We will become a reserve thread if the following holds true:
+      1) There is a waiting thread
+      2) There are no queries available
+      3) There are enough threads waiting for something to do execute
+      4) There is no shutdown ongoing
     */
-    while (
-          /* Current event group is consumed */
-          (cur_events->event_current == cur_events->event_total) &&
-          /* Next event group not ready */
-          (my_tp_group->events_index_produce ==
-                  my_tp_group->events_index_consume) &&
-          /* Someone is waiting for new IO */
-          (my_tp_group->waiting_thread != NULL) &&
-          /* Already enough threads waiting for something to do */
-          (my_tp_group->threads_for_consumer >=
+    while ((!is_query_available(my_tp_group)) &&
+           (my_tp_group->threads_for_consumer >=
             my_tp_group->threads_user_request) &&
-          /* I'm not supposed to be shutting down */
-          (tp_shutdown == 0))
+           (tp_shutdown == 0))
     {
-      /* I'll be a reserve thread */
-      my_tp_group->threads_for_reserve++;
-      (void) mysql_cond_wait(&my_tp_group->COND_reserve,
-              &my_tp_group->LOCK_group);
-      /* Get the current event group in case it changed when I was busy */
-      cur_events=
-            &(my_tp_group->tp_events[my_tp_group->events_index_consume]);
-      my_tp_group->threads_for_reserve--;
+      if (my_tp_group->waiting_thread == NULL)
+        handle_waiting_thread(my_tp_group, my_thread_data);
+      else
+      {
+        /* I'll be a reserve thread */
+        my_tp_group->threads_for_reserve++;
+        (void) mysql_cond_wait(&my_tp_group->COND_reserve,
+                &my_tp_group->LOCK_group);
+        /* Get the current event group in case it changed when I was busy */
+        cur_events= get_cur_events(my_tp_group);
+        my_tp_group->threads_for_reserve--;
+      }
     }
 
-    /* Wait for something to do */
-    while (
-          /* No events pending Or no credits remain */
-          (
-            /* No events pending */
-            (
-              /* Current event group is consumed */
-              (cur_events->event_current == cur_events->event_total) &&
-              /* Next event group not ready */
-              (my_tp_group->events_index_produce ==
-                      my_tp_group->events_index_consume)
-            ) ||
-            /* no credits remaining */
-            (get_active_threads(my_tp_group) >=
-            my_tp_group->max_active_threads)
-          ) &&
-          /* Someone is waiting for new IO */
-          (my_tp_group->waiting_thread != NULL) &&
-          /* I'm not supposed to be shutting down */
-          (tp_shutdown == 0))
+    /*
+      Wait for something to do as long as the following is true:
+      1) There is a waiting thread
+      2) There are no queries available OR there is no credit to execute
+      3) Shutdown isn't ordered
+    */
+    while ((!(is_query_available(my_tp_group) &&
+              is_credit_available(my_tp_group))) &&
+           (tp_shutdown == 0))
     {
-      /* I'll be a consumer thread */
-      my_tp_group->threads_for_consumer++;
-      (void) mysql_cond_wait(&my_tp_group->COND_consumer,
-              &my_tp_group->LOCK_group);
-      execute_ready_query(my_tp_group, my_thread_data, FALSE);
-      /* Get the current event group in case it changed when I was busy */
-      cur_events=
-            &(my_tp_group->tp_events[my_tp_group->events_index_consume]);
-      my_tp_group->threads_for_consumer--;
+      if (my_tp_group->waiting_thread == NULL)
+        handle_waiting_thread(my_tp_group, my_thread_data);
+      else
+      {
+        /* I'll be a consumer thread */
+        my_tp_group->threads_for_consumer++;
+        (void) mysql_cond_wait(&my_tp_group->COND_consumer,
+                &my_tp_group->LOCK_group);
+        /* Get the current event group in case it changed when I was busy */
+        my_tp_group->threads_for_consumer--;
+      }
     }
 
     /* We need to exit */
     if (tp_shutdown != 0)
       break;
 
-    /* We need to process tp_events */
-    while (
-          /* Current event group has more to consume */
-          (cur_events->event_current < cur_events->event_total) ||
-          /* Next event group has tp_events to consume */
-          (my_tp_group->events_index_produce !=
-                  my_tp_group->events_index_consume))
+    /*
+      We need to process tp_events as long as:
+      1) Queries available
+      2) Credit available
+      3) Shutdown isn't ordered
+    */
+    do
     {
-      /* Should we move to the next event group for consuming? */
-      if ((cur_events->event_current == cur_events->event_total) &&
-          (my_tp_group->events_index_consume !=
-                  my_tp_group->events_index_produce))
-      {
-        /* Move to the next event group for consuming tp_events */
-        my_tp_group->events_index_consume++;
-        my_tp_group->events_index_consume&= MAX_EVENT_GROUP_MASK;
-        cur_events=
-            &(my_tp_group->tp_events[my_tp_group->events_index_consume]);
-        mysql_cond_signal(&my_tp_group->COND_producer);
-      }
-
-      if (cur_events->event_current < cur_events->event_total)
-      {
-        /* Get a message for me to process */
-        my_thread_data->client_low_level_cntx=
-                    cur_events->event_ready_array[cur_events->event_current];
-        cur_events->event_current++;
-
-        if (
-              /* Current event group is NOT consumed */
-              (cur_events->event_current != cur_events->event_total) ||
-              /* Next event group ready */
-              (my_tp_group->events_index_produce !=
-                      my_tp_group->events_index_consume) ||
-              /* No one is waiting for new IO */
-              (my_tp_group->waiting_thread == NULL))
-        {
-          /* Wake another thread to help process this list of tp_events */
-          tp_wake_thread(my_tp_group, WAKE_IF_CONSUMER);
-        }
-
-        /* Unlock the group before I process an event */
-        mysql_mutex_unlock(&my_tp_group->LOCK_group);
-
-        /* Process the event */
-        tp_process_event(my_thread_data, FALSE);
-        my_thread_data->client_low_level_cntx= NULL;
-
-        /* Lock the group so I can update my completion status */
-        mysql_mutex_lock(&my_tp_group->LOCK_group);
-      }
-      /* Get the current event group in case it changed while I was busy */
-      cur_events=
-            &(my_tp_group->tp_events[my_tp_group->events_index_consume]);
-    }
-    check_execute_queued_query(my_tp_group, my_thread_data);
+      process_query(my_tp_group, my_thread_data);
+      /* Get the current event group in case it changed when I was busy */
+    } while (is_query_available(my_tp_group) &&
+             is_credit_available(my_tp_group) &&
+             (tp_shutdown == 0));
   }
   DBUG_PRINT("exit", ("ending thread"));
 #ifndef _WIN32
@@ -977,7 +905,7 @@ pthread_handler_t tp_stall_check_thread_
       */
       cur_group->max_active_threads= cur_group->threads_user_request +
                                      threads_stalled;
-      check_ready_query(cur_group);
+      check_trans_queue(cur_group);
       mysql_mutex_unlock(&cur_group->LOCK_group);
     }
 
@@ -1038,7 +966,6 @@ void thd_pool_end(void)
     {
       cur_group= &(tp_group_list[group_loop]);
 
-      mysql_cond_broadcast(&(cur_group->COND_producer));
       mysql_cond_broadcast(&(cur_group->COND_consumer));
       mysql_cond_broadcast(&(cur_group->COND_reserve));
 
@@ -1072,7 +999,6 @@ void thd_pool_end(void)
     }
     my_atomic_rwlock_destroy(&cur_group->LOCK_threads_active);
     mysql_mutex_destroy(&cur_group->LOCK_group);
-    mysql_cond_destroy(&cur_group->COND_producer);
     mysql_cond_destroy(&cur_group->COND_consumer);
     mysql_cond_destroy(&cur_group->COND_reserve);
   }
@@ -1137,7 +1063,6 @@ bool thd_pool_init(void)
     /* Set initial values for all group members */
     mysql_mutex_init(key_LOCK_group,
                      &cur_group->LOCK_group, MY_MUTEX_INIT_FAST);
-    mysql_cond_init(key_COND_producer, &cur_group->COND_producer, NULL);
     mysql_cond_init(key_COND_consumer, &cur_group->COND_consumer, NULL);
     mysql_cond_init(key_COND_reserve, &cur_group->COND_reserve, NULL);
     my_atomic_rwlock_init(&cur_group->LOCK_threads_active);
@@ -1145,21 +1070,6 @@ bool thd_pool_init(void)
     cur_group->group_idx= group_loop;
     cur_group->group_low_level_cntx= NULL;
 
-    for (events_loop= 0; events_loop < MAX_EVENT_GROUPS; events_loop++)
-    {
-      cur_group->tp_events[events_loop].event_max= MAX_EVENTS_PER_WAIT_CALL;
-      for (client_loop= 0;
-            client_loop < MAX_EVENTS_PER_WAIT_CALL;
-            client_loop++)
-      {
-        cur_group->tp_events[events_loop].event_ready_array[client_loop]= NULL;
-      }
-      cur_group->tp_events[events_loop].event_total= 0;
-      cur_group->tp_events[events_loop].event_current= 0;
-    }
-    cur_group->events_index_produce= 0;
-    cur_group->events_index_consume= 0;
-
     cur_group->threads_user_request= 0;
     cur_group->max_active_threads= 0;
     cur_group->threads_in_group= 0;
@@ -1167,12 +1077,12 @@ bool thd_pool_init(void)
     cur_group->threads_for_consumer= 0;
     cur_group->threads_for_reserve= 0;
     cur_group->threads_active= 0;
-    cur_group->num_active_transactions= 0;
-    cur_group->max_active_transactions= 1;
     cur_group->first_queued_query= NULL;
     cur_group->last_queued_query= NULL;
-    cur_group->first_ready_query= NULL;
-    cur_group->last_ready_query= NULL;
+    cur_group->first_queued_trans= NULL;
+    cur_group->last_queued_trans= NULL;
+    cur_group->query_events= 0;
+    cur_group->trans_events= 0;
 
     for (thread_loop= 0;
           thread_loop < MAX_THREADS_PER_GROUP;
@@ -1407,21 +1317,53 @@ void thd_pool_wait_end(THD *thd)
 #endif
 
 /* Module to handle transaction queueing */
+static void
+analyse_lists(tp_group_t *my_tp_group)
+{
+  int queury_events= 0;
+  int trans_events= 0;
+  tp_client_low_level_t client_cntx, prev_client_cntx;
+
+  client_cntx= my_tp_group->first_queued_query;
+  prev_client_cntx= client_cntx;
+  while (client_cntx)
+  {
+    prev_client_cntx= client_cntx;
+    query_events++;
+    client_cntx= client_cntx->next_client_query;
+  }
+  DBUG_ASSERT(my_tp_group->last_queued_query == prev_client_cntx);
+  DBUG_ASSERT(my_tp_group->query_events == query_events);
+
+  client_cntx= my_tp_group->first_queued_trans;
+  prev_client_cntx= client_cntx;
+  while (client_cntx)
+  {
+    prev_client_cntx= client_cntx;
+    trans_events++;
+    client_cntx= client_cntx->next_client_query;
+  }
+  DBUG_ASSERT(my_tp_group->last_queued_trans == prev_client_cntx);
+  DBUG_ASSERT(my_tp_group->trans_events == trans_events);
+}
+
 static tp_client_low_level_t
-remove_ready_query(tp_group_t *my_tp_group)
+remove_queued_trans(tp_group_t *my_tp_group)
 {
   tp_client_low_level_t client_cntx, next_client_cntx;
-  DBUG_ENTER("remove_ready_query");
+  DBUG_ENTER("remove_queued_trans");
 
-  if (!my_tp_group->first_ready_query)
+  if (!my_tp_group->first_queued_trans)
     DBUG_RETURN(NULL);
-  client_cntx= my_tp_group->first_ready_query;
-  next_client_cntx= (tp_client_low_level_t)client_cntx->next_client_query;
+  my_tp_group->trans_events--;
+  client_cntx= my_tp_group->first_queued_trans;
+  next_client_cntx= client_cntx->next_client_query;
   client_cntx->next_client_query= NULL;
-  my_tp_group->first_ready_query= next_client_cntx;
+  my_tp_group->first_queued_trans= next_client_cntx;
   if (!next_client_cntx)
-    my_tp_group->last_ready_query= NULL;
+    my_tp_group->last_queued_trans= NULL;
   client_cntx->wait_count= 0;
+  analyse_lists(my_tp_group);
   DBUG_RETURN(client_cntx);
 }
 
@@ -1433,6 +1375,7 @@ remove_queued_query(tp_group_t *my_tp_gr
 
   if (!my_tp_group->first_queued_query)
     DBUG_RETURN(NULL);
+  my_tp_group->query_events--;
   client_cntx= my_tp_group->first_queued_query;
   next_client_cntx= client_cntx->next_client_query;
   client_cntx->next_client_query= NULL;
@@ -1440,25 +1383,29 @@ remove_queued_query(tp_group_t *my_tp_gr
   if (!next_client_cntx)
     my_tp_group->last_queued_query= NULL;
   client_cntx->wait_count= 0;
+  analyse_lists(my_tp_group);
   DBUG_RETURN(client_cntx);
 }
 
 static void
-insert_ready_query(tp_group_t *my_tp_group,
-                   tp_client_low_level_t client_cntx)
+insert_queued_trans(tp_group_t *my_tp_group,
+                    tp_client_low_level_t client_cntx)
 {
-  DBUG_ENTER("insert_ready_query");
-  if (my_tp_group->last_ready_query)
+  DBUG_ENTER("insert_queued_trans");
+  DBUG_PRINT("enter", ("Queueing client_cntx = 0x%x", client_cntx));
+  my_tp_group->trans_events++;
+  if (my_tp_group->last_queued_trans)
   {
-    my_tp_group->last_ready_query->next_client_query= client_cntx;
-    my_tp_group->last_ready_query= client_cntx;
+    my_tp_group->last_queued_trans->next_client_query= client_cntx;
+    my_tp_group->last_queued_trans= client_cntx;
   }
   else
   {
-    my_tp_group->first_ready_query= client_cntx;
-    my_tp_group->last_ready_query= client_cntx;
-    client_cntx->next_client_query= NULL;
+    my_tp_group->first_queued_trans= client_cntx;
+    my_tp_group->last_queued_trans= client_cntx;
   }
+  client_cntx->next_client_query= NULL;
+  analyse_lists(my_tp_group);
   DBUG_VOID_RETURN;
 }
 
@@ -1470,6 +1417,7 @@ insert_queued_query(tp_group_t *my_tp_gr
   DBUG_PRINT("enter", ("Queueing client_cntx = 0x%x", client_cntx));
 
   client_cntx->wait_count= my_tp_group->check_count;
+  my_tp_group->queury_events++;
   if (my_tp_group->last_queued_query)
   {
     my_tp_group->last_queued_query->next_client_query= client_cntx;
@@ -1479,217 +1427,50 @@ insert_queued_query(tp_group_t *my_tp_gr
   {
     my_tp_group->first_queued_query= client_cntx;
     my_tp_group->last_queued_query= client_cntx;
-    client_cntx->next_client_query= NULL;
   }
+  client_cntx->next_client_query= NULL;
+  analyse_lists(my_tp_group);
   DBUG_VOID_RETURN;
 }
 
 #define MAX_TP_TRANS_WAIT 30
 static void
-check_ready_query(tp_group_t *my_tp_group)
+check_trans_queue(tp_group_t *my_tp_group)
 {
   uint64 check_count, wait_count;
   tp_client_low_level_t client_cntx;
-  DBUG_ENTER("check_ready_query");
+  DBUG_ENTER("check_trans_queue");
 
-  client_cntx= my_tp_group->first_queued_query;
+  client_cntx= my_tp_group->first_queued_trans;
   check_count= my_tp_group->check_count;
   if (client_cntx &&
       (check_count - client_cntx->wait_count) > MAX_TP_TRANS_WAIT)
   {
     wait_count= client_cntx->wait_count;
-    remove_queued_query(my_tp_group);
-    insert_ready_query(my_tp_group, client_cntx);
+    remove_queued_trans(my_tp_group);
+    insert_queued_query(my_tp_group, client_cntx);
     client_cntx->wait_count= wait_count;
   }
   check_count++;
   my_tp_group->check_count= check_count;
-  if (my_tp_group->first_ready_query)
+  if (my_tp_group->first_queued_trans || my_tp_group->first_queued_query)
   {
     DBUG_PRINT("info", ("Wake ready query in group %d",
                my_tp_group->group_idx));
     tp_wake_thread(my_tp_group, WAKE_OR_CREATE_ONE);
   }
-  if (my_tp_group->first_queued_query)
-  {
-    DBUG_PRINT("info", ("Wake queued query in group %d",
-               my_tp_group->group_idx));
-    tp_wake_thread(my_tp_group, WAKE_IF_CREATED);
-  }
   DBUG_VOID_RETURN;
 }
 
 static tp_client_low_level_t
-get_oldest_queued_query(tp_group_t *my_tp_group)
+get_highest_priority_query(tp_group_t *my_tp_group)
 {
-  DBUG_ENTER("get_oldest_queued_query");
+  DBUG_ENTER("get_highest_priority_query");
 
   tp_client_low_level_t client_cntx;
-  if ((client_cntx= remove_ready_query(my_tp_group)) ||
-      (client_cntx= remove_queued_query(my_tp_group)))
+  if ((client_cntx= remove_queued_query(my_tp_group)) ||
+      (client_cntx= remove_queued_trans(my_tp_group)))
     DBUG_RETURN(client_cntx);
   DBUG_RETURN(NULL);
 }
-
-static void
-execute_ready_query(tp_group_t *my_tp_group,
-                    tp_thread_t *my_thread_data,
-                    my_bool exec_queued)
-{
-  tp_client_low_level_t client_cntx;
-  DBUG_ENTER("execute_ready_query");
-
-  if (!tp_shutdown)
-  {
-    if ((client_cntx= remove_ready_query(my_tp_group)) ||
-        (exec_queued &&
-         (client_cntx= remove_queued_query(my_tp_group))))
-    {
-      /*
-        A transaction has been held off for too long, it needs to be executed
-        before we fetch more data from the network. We ensure another thread
-        gets immediately woken up to ensure we don't deadlock in this
-        situation.
-      */
-      tp_wake_thread(my_tp_group, WAKE_IF_CONSUMER);
-      my_thread_data->client_low_level_cntx= client_cntx;
-      mysql_mutex_unlock(&my_tp_group->LOCK_group);
-      /* Process the event */
-      tp_process_event(my_thread_data, TRUE);
-      my_thread_data->client_low_level_cntx= NULL;
-      mysql_mutex_lock(&my_tp_group->LOCK_group);
-    }
-  }
-  DBUG_VOID_RETURN;
-}
-
-static int
-check_active_transactions_before(tp_group_t *my_tp_group,
-                                 tp_thread_t *my_thread_data,
-                                 my_bool must_start)
-{
-  bool ret_flag= FALSE;
-  THD *thd= tp_client_low_level_get_thd(my_thread_data->client_low_level_cntx);
-  DBUG_ENTER("check_active_transactions_before");
-
-  if (!thd->transaction.is_active())
-  {
-    DBUG_PRINT("info", ("Transaction not active before"));
-    mysql_mutex_lock(&my_tp_group->LOCK_group);
-    if (!must_start)
-    {
-      if (my_tp_group->num_active_transactions >=
-          my_tp_group->max_active_transactions &&
-          get_active_threads(my_tp_group) >=
-          my_tp_group->max_active_threads)
-      {
-        /*
-          Too many active transactions, we need to stall this and start it
-          up later, either through it being blocked for too long, or by
-          the number of active transactions decreasing to a level where it
-          is ok to start another transaction again.
-
-          We only stall when there is already enough active threads in this
-          group.
-        */
-        insert_queued_query(my_tp_group,
-                            my_thread_data->client_low_level_cntx);
-        ret_flag= TRUE;
-      }
-      else
-      {
-        tp_client_low_level_t new_client_cntx;
-        my_tp_group->num_active_transactions++;
-        /*
-          Before we start this new transaction we need to ensure that there are
-          no waiting queries to be executed. If this is the case we need to
-          execute them first.
-        */
-        new_client_cntx= get_oldest_queued_query(my_tp_group);
-        if (new_client_cntx)
-        {
-          insert_queued_query(my_tp_group,
-                              my_thread_data->client_low_level_cntx);
-          my_thread_data->client_low_level_cntx= new_client_cntx;
-        }
-      }
-    }
-    else
-    {
-      my_tp_group->num_active_transactions++;
-      DBUG_PRINT("info", ("Transaction active before"));
-    }
-    mysql_mutex_unlock(&my_tp_group->LOCK_group);
-  }
-  DBUG_RETURN(ret_flag);
-}
-
-static int
-check_active_transactions_after(tp_group_t *my_tp_group,
-                                tp_thread_t *my_thread_data,
-                                THD *thd,
-                                my_bool must_start)
-{
-  DBUG_ENTER("check_active_transactions_after");
-  if (!thd || !thd->transaction.is_active())
-  {
-    tp_client_low_level_t client_cntx= NULL;
-    DBUG_PRINT("info", ("Transaction not active after"));
-    /*
-      The query also completed a user transaction, decrease the number
-      of active transactions and if the new current number of transactions
-      goes below the max number of active transactions we should execute
-      the oldest queued query in the list on this group.
-    */
-    mysql_mutex_lock(&my_tp_group->LOCK_group);
-    if (my_tp_group->num_active_transactions <=
-        my_tp_group->max_active_transactions)
-    {
-      /*
-        For waiting threads we never want to execute more than one query
-        before returning to serve as waiting thread.
-        Also execute_ready_query only executes one query.
-      */
-      if (must_start ||
-          my_tp_group->waiting_thread == my_thread_data)
-        client_cntx= NULL;
-      else
-        client_cntx= get_oldest_queued_query(my_tp_group);
-    }
-    if (!client_cntx)
-      my_tp_group->num_active_transactions--;
-    mysql_mutex_unlock(&my_tp_group->LOCK_group);
-    if (client_cntx)
-    {
-      DBUG_PRINT("info", ("Preparing to execute client_cntx: 0x%x", client_cntx));
-      my_thread_data->client_low_level_cntx= client_cntx;
-      DBUG_RETURN(TRUE);
-    }
-  }
-  else
-  {
-    DBUG_PRINT("info", ("Transaction active after"));
-  }
-  DBUG_RETURN(FALSE);
-}
-
-static void
-check_execute_queued_query(tp_group_t *my_tp_group,
-                           tp_thread_t *my_thread_data)
-{
-  DBUG_ENTER("check_execute_queued_query");
-
-  if ((my_tp_group->first_queued_query ||
-       my_tp_group->first_ready_query) &&
-       get_active_threads(my_tp_group) <
-       my_tp_group->max_active_threads)
-  {
-     /*
-       There are queries to execute and we're ready to execute one of them
-       which is queued.
-     */
-     execute_ready_query(my_tp_group, my_thread_data, TRUE);
-  }
-  DBUG_VOID_RETURN;
-}
 #endif

=== modified file 'sql/scheduler_thread_pool.h'
--- a/sql/scheduler_thread_pool.h	2010-01-20 15:19:39 +0000
+++ b/sql/scheduler_thread_pool.h	2010-01-22 10:14:49 +0000
@@ -30,8 +30,8 @@
     0x0001 << 4--> 16 groups ... works well in 2009 on 8 and 16 way CPUs
     0x0001 << 5--> 32 groups ... worse than 16 groups by 10%
 */
-#define MAX_THREAD_GROUPS (0x0001 << 0) /* use this for testing */
-//#define MAX_THREAD_GROUPS (0x0001 << 4)
+//#define MAX_THREAD_GROUPS (0x0001 << 0) /* use this for testing */
+#define MAX_THREAD_GROUPS (0x0001 << 4)
 
 bool thd_pool_init(void);
 void thd_pool_add_connection(THD *thd);


Attachment: [text/bzr-bundle] bzr/mikael@mysql.com-20100122101449-5wycfulaqoj337p3.bundle
Thread
bzr commit into mysql-5.5-next-mr branch (mikael:3000) Mikael Ronstrom22 Jan