List:Commits« Previous MessageNext Message »
From:knielsen Date:November 1 2007 3:04pm
Subject:bk commit into 5.1 tree (knielsen:1.2615)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-11-01 15:04:05+01:00, knielsen@ymer.(none) +1 -0
  WL#1498: Multithreaded ndbd.
  
  Fix signal trace dumps, broken by recent update of signal scheduling.
  Now we look at all available buffers for the next signal to dump, as
  now the sequences of signal execution and job buffer release need no
  longer match.

  storage/ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-11-01 15:04:02+01:00,
knielsen@ymer.(none) +84 -126
    Fix signal trace dumps, broken by recent update of signal scheduling.
    Now we look at all available buffers for the next signal to dump, as
    now the sequences of signal execution and job buffer release need no
    longer match.

diff -Nrup a/storage/ndb/src/kernel/vm/mt/mt.cpp b/storage/ndb/src/kernel/vm/mt/mt.cpp
--- a/storage/ndb/src/kernel/vm/mt/mt.cpp	2007-11-01 11:30:29 +01:00
+++ b/storage/ndb/src/kernel/vm/mt/mt.cpp	2007-11-01 15:04:02 +01:00
@@ -1798,149 +1798,107 @@ FastScheduler::dumpSignalMemory(Uint32 t
   Uint32 seq_end = 0;
 
   const thr_data *thr_ptr = &g_thr_repository.m_thread[thr_no];
+  if (watchDogCounter)
+    *watchDogCounter = 4;
+
   /*
     ToDo: Might do some sanity check to avoid crashing on not yet initialised
     thread.
   */
 
   /* Scan all available buffers with already executed signals. */
-  Uint32 jbb_current_thr = 0;
-  Uint32 jbb_start = thr_ptr->m_first_free;
-  Uint32 jb_end = thr_ptr->m_first_unused;
-  const thr_job_buffer *jbb = NULL;
-  Uint32 jbb_pos, jbb_len;
-  bool no_more_priob = false;
-  const thr_job_buffer *jba = NULL;
-  Uint32 jba_start = jbb_start;
-  Uint32 jba_pos, jba_len;
-  bool no_more_prioa = false;
-
-  if (watchDogCounter)
-    *watchDogCounter = 4;
 
-  for (;;)
+  /*
+   * Keep track of all available buffers, so that we can pick out signals in
+   * the same order they were executed (order obtained from signal id).
+   *
+   * We may need to keep track of THR_FREE_BUF_MAX buffers for fully executed
+   * (and freed) buffers, plus MAX_THREADS buffers for currently active
+   * prio B buffers, plus one active prio A buffer.
+   */
+  struct {
+    const thr_job_buffer *m_jb;
+    Uint32 m_pos;
+    Uint32 m_max;
+  } jbs[THR_FREE_BUF_MAX + MAX_THREADS + 1];
+
+  Uint32 num_jbs = 0;
+
+  /* Load released buffers. */
+  Uint32 idx = thr_ptr->m_first_free;
+  while (idx != thr_ptr->m_first_unused)
   {
-    if (jba == NULL && !no_more_prioa)
+    const thr_job_buffer *q = thr_ptr->m_free_fifo[idx];
+    if (q->m_len > 0)
     {
-      /* Find the next released buffer containing prio A signals, if any. */
-      while (jba_start != jb_end)
-      {
-        const thr_job_buffer *tmp = thr_ptr->m_free_fifo[jba_start];
-        jba_start = (jba_start + 1) % THR_FREE_BUF_MAX;
-        if (tmp->m_len > 0 && tmp->m_prioa)
-        {
-          jba = tmp;
-          jba_pos = 0;
-          jba_len = jba->m_len;
-          break;
-        }
-      }
-      if (jba == NULL)
-      {
-        /*
-          No more released prio A buffers, but there might be some stuff
-          in the current buffer.
-        */
-        Uint32 jba_execute_pos = thr_ptr->m_jba_read_state.m_read_pos;
-        if (jba_execute_pos > 0)
-        {
-          Uint32 jba_current = thr_ptr->m_jba_read_state.m_read_index;
-          jba = thr_ptr->m_jba.m_buffers[jba_current];
-          jba_pos = 0;
-          jba_len = jba_execute_pos;
-        }
-        else
-          jba = NULL;
-        no_more_prioa = true;
-      }
+      jbs[num_jbs].m_jb = q;
+      jbs[num_jbs].m_pos = 0;
+      jbs[num_jbs].m_max = q->m_len;
+      num_jbs++;
     }
+    idx = (idx + 1) % THR_FREE_BUF_MAX;
+  }
+  /* Load any active prio B buffers. */
+  for (Uint32 thr_no = 0; thr_no < g_thr_repository.m_thread_count; thr_no++)
+  {
+    const thr_job_queue *q = thr_ptr->m_in_queue + thr_no;
+    const thr_jb_read_state *r = thr_ptr->m_read_states + thr_no;
+    Uint32 read_pos = r->m_read_pos;
+    if (read_pos > 0)
+    {
+      jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index];
+      jbs[num_jbs].m_pos = 0;
+      jbs[num_jbs].m_max = read_pos;
+      num_jbs++;
+    }
+  }
+  /* Load any active prio A buffer. */
+  const thr_jb_read_state *r = &thr_ptr->m_jba_read_state;
+  Uint32 read_pos = r->m_read_pos;
+  if (read_pos > 0)
+  {
+    jbs[num_jbs].m_jb = thr_ptr->m_jba.m_buffers[r->m_read_index];
+    jbs[num_jbs].m_pos = 0;
+    jbs[num_jbs].m_max = read_pos;
+    num_jbs++;
+  }
+
+  /* Now pick out one signal at a time, in signal id order. */
+  while (num_jbs > 0)
+  {
+    if (watchDogCounter)
+      *watchDogCounter = 4;
 
-    if (jbb == NULL && !no_more_priob)
+    /* Search out the smallest signal id remaining. */
+    Uint32 idx_min = 0;
+    const Uint32 *p = jbs[idx_min].m_jb->m_data + jbs[idx_min].m_pos;
+    const SignalHeader *s_min = reinterpret_cast<const SignalHeader*>(p);
+    Uint32 sid_min = s_min->theSignalId;
+
+    for (Uint32 i = 1; i < num_jbs; i++)
     {
-      /* Find the next released buffer containing prio B signals, if any. */
-      while (jbb_start != jb_end)
-      {
-        const thr_job_buffer *tmp = thr_ptr->m_free_fifo[jbb_start];
-        jbb_start = (jbb_start + 1) % THR_FREE_BUF_MAX;
-        if (tmp->m_len > 0 && !tmp->m_prioa)
-        {
-          jbb = tmp;
-          jbb_pos = 0;
-          jbb_len = jbb->m_len;
-          break;
-        }
-      }
-      if (jbb == NULL)
+      p = jbs[i].m_jb->m_data + jbs[i].m_pos;
+      const SignalHeader *s = reinterpret_cast<const SignalHeader*>(p);
+      Uint32 sid = s->theSignalId;
+      if (wrap_compare(sid, sid_min) < 0)
       {
-        /*
-          No more released prio B buffers, but there might be some stuff
-          in the buffer(s) currently being processed.
-        */
-        while (jbb_current_thr < g_thr_repository.m_thread_count)
-        {
-          const thr_job_queue *q = &(thr_ptr->m_in_queue[jbb_current_thr]);
-          const thr_jb_read_state *r = &(thr_ptr->m_read_states[jbb_current_thr]);
-          jbb_current_thr++;
-          Uint32 read_index = r->m_read_index;
-          Uint32 read_pos = r->m_read_pos;
-          if (read_pos > 0)
-          {
-            jbb = q->m_buffers[read_index];
-            jbb_pos = 0;
-            jbb_len = read_pos;
-            break;
-          }
-        }
-        if (jbb == NULL)
-          no_more_priob = true;
+        idx_min = i;
+        s_min = s;
+        sid_min = sid;
       }
     }
-    const SignalHeader *s_a;
-    if (jba)
-      s_a = reinterpret_cast<const SignalHeader*>(&(jba->m_data[jba_pos]));
-    else
-      s_a = NULL;
-
-    const SignalHeader *s_b;
-    if (jbb)
-      s_b = reinterpret_cast<const SignalHeader*>(&(jbb->m_data[jbb_pos]));
-    else
-      s_b = NULL;
-
-    /* Check if we're all done. */
-    if (!s_a && !s_b)
-      break;
-
-    /*
-      If we have both a prio A and a prio B, take the one with the lowest
-      signal id (taking into account that 0x00000000 is just after 0xffffffff).
-    */
-    int cmp;                    // <0 (a < b) -> use a, >0 -> (a > b)
-> use b
-    if (!s_a)
-      cmp = 1;
-    else if (!s_b)
-      cmp = -1;
-    else
-      cmp = wrap_compare(s_a->theSignalId, s_b->theSignalId);
 
-    /* cmp=0 is impossible, unless we somehow get >2**32 signals of history. */
-    if (cmp < 0)
-    {
-      /* Next is prio A. */
-      signalSequence[seq_end].ptr = s_a;
-      signalSequence[seq_end].prioa = true;
-      jba_pos += (sizeof(*s_a)>>2) + s_a->m_noOfSections + s_a->theLength;
-      if (jba_pos >= jba_len)
-        jba = NULL;             // Done with this buffer
-    }
-    else
+    /* We found the next signal, now put it in the ordered cyclic buffer. */
+    signalSequence[seq_end].ptr = s_min;
+    signalSequence[seq_end].prioa = jbs[idx_min].m_jb->m_prioa;
+    Uint32 siglen =
+      (sizeof(SignalHeader)>>2) + s_min->m_noOfSections + s_min->theLength;
+    jbs[idx_min].m_pos += siglen;
+    if (jbs[idx_min].m_pos >= jbs[idx_min].m_max)
     {
-      /* Next is prio B. */
-      signalSequence[seq_end].ptr = s_b;
-      signalSequence[seq_end].prioa = false;
-      jbb_pos += (sizeof(*s_b)>>2) + s_b->m_noOfSections + s_b->theLength;
-      if (jbb_pos >= jbb_len)
-        jbb = NULL;             // Done with this buffer
+      /* We are done with this job buffer. */
+      num_jbs--;
+      jbs[idx_min] = jbs[num_jbs];
     }
     seq_end = (seq_end + 1) % MAX_SIGNALS_TO_DUMP;
     /* Drop old signals if too many available in history. */
Thread
bk commit into 5.1 tree (knielsen:1.2615)knielsen1 Nov