List:Commits« Previous MessageNext Message »
From:jonas Date:January 1 2007 9:58am
Subject:bk commit into 5.0 tree (jonas:1.2262)
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-01-01 09:58:26+01:00, jonas@eel.(none) +2 -0
  ndbmtd - more work

  ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp@stripped, 2007-01-01 09:58:07+01:00, jonas@eel.(none) +0 -3
    remove useless method-calls (not present in mt-version)

  ndb/src/kernel/vm/mt/mt.cpp@stripped, 2007-01-01 09:58:07+01:00, jonas@eel.(none) +78 -26
    more mt-work
    primitive transporter support

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	eel.(none)
# Root:	/home/jonas/src/50-atrt

--- 1.4/ndb/src/kernel/vm/mt/mt.cpp	2007-01-01 09:58:31 +01:00
+++ 1.5/ndb/src/kernel/vm/mt/mt.cpp	2007-01-01 09:58:31 +01:00
@@ -130,6 +130,15 @@
   sl->m_lock = 0;
 }
 
+static
+inline
+int
+trylock(struct thr_spin_lock* sl)
+{
+  volatile unsigned* val = &sl->m_lock;
+  return xcng(val, 1);
+}
+
 /**
  *
  */
@@ -137,8 +146,8 @@
 {
   static const unsigned SIZE = 8190;
 
-  unsigned m_pos;
   unsigned m_len;
+  unsigned m_pos;
   unsigned m_data[SIZE];
 };  
 
@@ -202,6 +211,8 @@
 struct thr_repository
 {
   int m_thread_count;
+  struct thr_spin_lock m_send_lock;
+  struct thr_spin_lock m_receive_lock;
   struct thr_data m_thread[MAX_THREADS];
   struct thr_safe_pool<thr_job_buffer> m_free_list;
 };
@@ -330,7 +341,7 @@
 static
 inline
 Uint32
-scanqueue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
+scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
 {
   Uint32 thr_no = selfptr->m_thr_no;
   Uint32 **pages = selfptr->m_tq.m_delayed_signals;
@@ -339,7 +350,7 @@
   for (Uint32 i = 0; i < cnt; i++, ptr++)
   {
     Uint32 val = * ptr;
-    if ((val & 0xFFFF) < end)
+    if ((val & 0xFFFF) <= end)
     {
       Uint32 idx = val >> 16;
       Uint32 buf = idx >> 8;
@@ -348,6 +359,8 @@
       Uint32* page = * (pages + buf);
 
       SignalHeader* s = reinterpret_cast<SignalHeader*>(page + pos);
+      if (0)
+	ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
       sendprioa(thr_no, thr_no, s);
       * (page + pos) = free;
       free = idx;
@@ -369,7 +382,7 @@
 
 static
 void
-dotimer(struct thr_data* selfptr)
+scan_time_queues(struct thr_data* selfptr)
 {
   struct thr_tq * tq = &selfptr->m_tq;
   NDB_TICKS now = NdbTick_CurrentMillisecond();
@@ -377,19 +390,28 @@
 
   Uint32 end = tq->m_current_time;
   Uint32 next = tq->m_next_timer;
+  Uint32 cnt0 = tq->m_cnt[0];
+  Uint32 cnt1 = tq->m_cnt[1];
 
   Uint64 diff = now - last;
   if (diff > 0)
   {
     Uint32 step = (Uint32)((diff > 20) ? 20 : diff);
-    end = (end + step) & 0xFFFF;
+    Uint32 next = (end + step) & 0xFFFF;
+    if (next <= end)
+    {
+      ndbout_c("wrap %d -> %d", end, next);
+      abort();
+    }
 
-    Uint32 cnt0 = scanqueue(selfptr, tq->m_cnt[0], end, tq->m_short_queue);
-    Uint32 cnt1 = scanqueue(selfptr, tq->m_cnt[1], end, tq->m_long_queue);
+    end = (end + step) & 0xFFFF;
+    
+    Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
+    Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
 
     tq->m_current_time = end;
-    tq->m_cnt[0] -= cnt0;
-    tq->m_cnt[1] -= cnt1;    
+    tq->m_cnt[0] = cnt0 - tmp0;
+    tq->m_cnt[1] = cnt1 - tmp1;    
     selfptr->m_time = now;
     
     return;
@@ -401,6 +423,32 @@
   abort();
 }
 
+static
+void
+do_send(struct thr_repository* rep, unsigned thr_no)
+{
+  static int cnt = 0;
+  if (cnt == 0)
+  {
+    globalTransporterRegistry.update_connections();
+  }
+  cnt = (cnt + 1) & 15;
+
+  globalTransporterRegistry.performSend();
+  unlock(&rep->m_send_lock);
+}
+
+static
+void
+do_receive(struct thr_repository* rep, unsigned thr_no, unsigned delay)
+{
+  if (globalTransporterRegistry.pollReceive(delay)) 
+  {
+    globalTransporterRegistry.performReceive();
+  }
+  unlock(&rep->m_receive_lock);
+}
+
 void
 thr_main(struct thr_repository* rep, unsigned thr_no)
 {
@@ -417,16 +465,17 @@
   unsigned* a_re_idxptr = &selfptr->m_jba.m_read_index;
   volatile unsigned * a_wr_idxptr = &selfptr->m_jba.m_write_index;
 
-  unsigned jba_read_index = * a_re_idxptr;
-  unsigned jba_write_index = * a_wr_idxptr;
 
   while (globalData.theRestartFlag != perform_stop)
   { 
     unsigned sum = 0;
     unsigned cnt = rep->m_thread_count;
+    nowait.tv_nsec = 10000000;
+
+    scan_time_queues(selfptr);
+    unsigned jba_read_index = * a_re_idxptr;
+    unsigned jba_write_index = * a_wr_idxptr;
 
-    dotimer(selfptr);
-    
     /**
      * Process each inbuffer
      */
@@ -465,6 +514,17 @@
 	  transfer_buffer(rep, thr_no, i);
 	}
       }
+      nowait.tv_nsec = 0;
+    }
+    
+    if (trylock(&rep->m_send_lock) == 0)
+    {
+      do_send(rep, thr_no);
+    }
+    
+    if (trylock(&rep->m_receive_lock) == 0)
+    {
+      do_receive(rep, thr_no, nowait.tv_nsec / 1000000);
     }
     else
     {
@@ -667,7 +727,9 @@
   memcpy(ptr, s, 4*siglen);
 
   if (0)
-    ndbout_c("send %s from %s to %s delay: %d idx: %x %p", 
+    ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p", 
+	     selfptr->m_tq.m_current_time,
+	     alarm,
 	     getSignalName(s->theVerId_signalNumber),
 	     getBlockName(refToBlock(s->theSendersBlockRef)),
 	     getBlockName(s->theReceiversBlockNumber),
@@ -778,16 +840,7 @@
   unsigned int i;
   unsigned int thr_no = 0;
   struct thr_repository* rep = &g_thr_repository;
-  unsigned int cnt = rep->m_thread_count;
-  for (i = 0; i<cnt; i++)
-  {
-    thr_job_buffer * jb = rep->m_thread[thr_no].m_out_queue[i];
-    if (jb->m_len)
-    {
-      transfer_buffer(rep, thr_no, i);
-    }
-  }
-
+  
   for (i = 0; i<NO_OF_BLOCKS; i++)
     rep->m_thread[thr_no].m_blocks[i] = globalData.getBlock(i + MIN_BLOCK_NO);
   
@@ -810,7 +863,7 @@
   StartOrd * const  startOrd = (StartOrd *)&signalT.theData[0];
   startOrd->restartInfo = 0;
   
-  sendlocal(0, 0, &signalT.header);
+  senddelay(0, &signalT.header, 1);
   return 0;
 }
 
@@ -1054,7 +1107,6 @@
     /**
      * Normal path 
      */
-    SignalT<25> signalT;
     signalT.m_sectionPtr[0].i = secPtr[0].i;
     signalT.m_sectionPtr[1].i = secPtr[1].i;
     signalT.m_sectionPtr[2].i = secPtr[2].i;

--- 1.25/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2007-01-01 09:58:31 +01:00
+++ 1.26/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2007-01-01 09:58:31 +01:00
@@ -794,9 +794,6 @@
      *
      * Do Restart
      */
-
-    globalScheduler.clear();
-    globalTimeQueue.clear();
     
     // Disconnect all nodes as part of the system restart. 
     // We need to ensure that we are starting up
Thread
bk commit into 5.0 tree (jonas:1.2262)jonas1 Jan