List:Commits« Previous MessageNext Message »
From:jonas Date:November 3 2006 10:50pm
Subject:bk commit into 5.0 tree (jonas:1.2172)
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2006-11-03 23:50:33+01:00, jonas@eel.(none) +1 -0
  new proof-of-concept file...

  ndb/src/kernel/vm/mt.cpp@stripped, 2006-11-03 23:49:46+01:00, jonas@eel.(none) +348 -0
    BitKeeper file /home/jonas/src/50-atrt/ndb/src/kernel/vm/mt.cpp

  ndb/src/kernel/vm/mt.cpp@stripped, 2006-11-03 23:49:46+01:00, jonas@eel.(none) +0 -0

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	eel.(none)
# Root:	/home/jonas/src/50-atrt
--- New file ---
+++ ndb/src/kernel/vm/mt.cpp	06/11/03 23:49:46
#include <sys/uio.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
#include <errno.h>
#include <assert.h>
#include <linux/futex.h>
#include <stdlib.h>
#include <string.h>

#define likely(x) x
#define unlikely(x) x

static inline
int
xcng(unsigned * addr, int val)
{
  asm volatile ("xchg %0, %1;" : "+r" (val) , "+m" (*addr));
  return val;
}

/* This must include the futex call */
#include <asm/unistd.h>

#define __NR_sys_futex __NR_futex

_syscall6(int,sys_futex,
	  unsigned *, futex,
	  int, op,
	  int, val,
	  const struct timespec *, rel,
	  int *, uaddr2,
	  int, val3);

static inline
int 
futex_wait(unsigned * addr, int val, const struct timespec * timeout)
{
  return sys_futex(addr, 
		   FUTEX_WAIT, val, timeout, 0, 0);
}

static inline
int 
futex_wake(unsigned * addr)
{
  return sys_futex(addr, FUTEX_WAKE, 1, 0, 0, 0);
}

#define MAX_THREADS 63

struct thr_wait
{
  unsigned m_futex_state;
  enum {
    FS_RUNNING = 0,
    FS_SLEEPING = 1,
  };
  thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
};

static
void
yield(struct thr_wait* wait, const struct timespec *timeout)
{
  unsigned * val = &wait->m_futex_state;
  int old = xcng(val, thr_wait::FS_SLEEPING);
  assert(old == thr_wait::FS_RUNNING);
  int ret;
  while ((ret = futex_wait(val, thr_wait::FS_SLEEPING, timeout)) == EINTR);
  if (ret == ETIMEDOUT)
  {
    xcng(val, thr_wait::FS_RUNNING);
  }
  else
  {
    assert(xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_RUNNING);
  }
}

static
int
wakeup(struct thr_wait* wait)
{
  unsigned * val = &wait->m_futex_state;
  if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
  {
    return futex_wake(val);
  }
  return 0;
}

extern "C"
void *
run(void * arg)
{
  thr_wait* wait = (thr_wait*)arg;
  sleep(1);
  printf("wakeup: %d\n", wakeup(wait));
  sleep(1);
}

struct signalheader 
{
  unsigned len;
  unsigned seccount;
  unsigned bno;
  unsigned gsn;
  unsigned header[3];
  unsigned data[1]; // As specified in len
};

struct Signal
{
  struct signalheader m_header;
};

struct SimulatedBlock
{
  unsigned m_thr_no;
  struct thr_data* m_thr_data;
  struct thread_repository* m_thr_repository;

  void execute(unsigned gsn, struct Signal*);
  void sendsignal(Signal*, unsigned ref_thr);
};

struct globaldata
{
  SimulatedBlock* get(unsigned);
};

struct job_buffer // 32k
{
  unsigned m_len;
  unsigned m_data[8191];

  inline int insert(struct signalheader* s) {
    unsigned len = m_len;
    unsigned* pos = m_data + len;
    
    unsigned siglen = s->len + s->seccount;
    memcpy(pos, s, 4*siglen);
    m_len = len + siglen;
    return (len + siglen + 32) - 8191; // > 0 not full, <=0 full
  }

  unsigned dojob(struct globaldata* g, struct Signal* sig){
    unsigned cnt = 0;
    unsigned pos = 0;
    unsigned len = m_len;
    while (pos < len)
    {
      signalheader* s = reinterpret_cast<signalheader*>(m_data + pos);
      unsigned siglen = s->len + s->seccount;
      unsigned bno = s->bno;
      unsigned gsn = s->gsn;
      SimulatedBlock * block = g->get(bno);
      memcpy(sig, s, 4*siglen);
      block->execute(gsn, sig);
      
      cnt++;
      pos += siglen;
    }
    return cnt;
  }
};

struct job_queue
{
  static const unsigned SIZE = 62;
  unsigned m_read_index; // used by consumer
  unsigned m_write_index; // used by producer
  struct job_buffer* m_buffers[SIZE];
};

struct thr_data
{
  thr_wait m_waiter;
  struct job_buffer* m_out_queue[MAX_THREADS];
  struct job_buffer* m_free_list;
  struct job_queue m_in_queue[MAX_THREADS];
};

template<typename T>
struct thr_safe_pool
{
  T* seize();
};

struct thread_repository
{
  int m_thread_count;
  struct thr_data m_thread[MAX_THREADS];
  struct thr_safe_pool<job_buffer> m_free_list;
};

#ifndef NOCODE
static
job_buffer*
seize_buffer(struct thread_repository* rep, int thr_no)
{
  job_buffer* jb;
  if (likely((jb = rep->m_thread[thr_no].m_free_list)))
  {
    job_buffer* next = reinterpret_cast<job_buffer*>(jb->m_len);
    rep->m_thread[thr_no].m_free_list = next;
    return jb;
  }

  /* global alloc */
  return rep->m_free_list.seize();
}

static
void
release_buffer(struct thread_repository* rep, int thr_no, job_buffer* jb)
{
  job_buffer* next = reinterpret_cast<job_buffer*>(jb->m_len);
  next = rep->m_thread[thr_no].m_free_list;
  rep->m_thread[thr_no].m_free_list = jb;
}

static
void
transfer_buffer(struct thread_repository* rep, int from, int to)
{
  unsigned old;
  unsigned * writeptr = &(rep->m_thread[to].m_in_queue[from].m_write_index);
  volatile unsigned * readptr = &(rep->m_thread[to].m_in_queue[from].m_read_index);
  unsigned readidx = * writeptr;
  unsigned writeidx = * readptr;
  unsigned nextidx = (writeidx + 1) % job_queue::SIZE;
  job_buffer* src = rep->m_thread[from].m_out_queue[to];

  if (unlikely(readidx == nextidx))
    goto check_full;

do_transfer:  
  rep->m_thread[to].m_in_queue[from].m_buffers[writeidx] = src;
  // need storestore barrier
  // but xcng is full barrier
  old = xcng(writeptr, nextidx);
  assert(old == writeidx);
  wakeup(&rep->m_thread[to].m_waiter); // potentially wakeup
  
  rep->m_thread[from].m_out_queue[to] = seize_buffer(rep, from);
  return ;
check_full:
  for (unsigned i = 0; i<1000; i++)
    if (* readptr != nextidx)
      goto do_transfer;
  
  abort();
}

void
thr_main(struct thread_repository* rep, unsigned thr_no)
{
  struct Signal * signal;
  while (true)
  {
    unsigned sum = 0;
    unsigned cnt = rep->m_thread_count;
    /**
     * Process each inbuffer
     */
    for (unsigned i = 0; i<cnt; i++)
    {
      unsigned ri = rep->m_thread[thr_no].m_in_queue[i].m_read_index;
      unsigned wi = rep->m_thread[thr_no].m_in_queue[i].m_write_index;
      job_buffer * jb = rep->m_thread[thr_no].m_in_queue[i].m_buffers[ri];
      if (ri != wi)
      {
	ri = (ri + 1) % job_queue::SIZE;
	rep->m_thread[thr_no].m_in_queue[i].m_read_index = ri;
	sum += jb->dojob(0, signal);
	release_buffer(rep, thr_no, jb);
      }
    }
    if (sum)
    {
      /**
       * Transfer all out buffers
       */
      for (unsigned i = 0; i<cnt; i++)
      {
	job_buffer * jb = rep->m_thread[thr_no].m_out_queue[i];
	if (jb->m_len)
	{
	  transfer_buffer(rep, thr_no, i);
	}
      }
    }
    else
    {
      yield(&rep->m_thread[thr_no].m_waiter, 0);
    }
  }
}

void
SimulatedBlock::sendsignal(Signal* sig, unsigned thr_no)
{
  struct thread_repository* rep = m_thr_repository;
  int res = m_thr_data->m_out_queue[thr_no]->insert(&sig->m_header);
  if (res <= 0)
  {
    transfer_buffer(rep, m_thr_no, thr_no);
  }
}

#endif

void 
sizes()
{
  printf("sizeof(thr_wait): %d\n", sizeof(thr_wait));
  printf("sizeof(job_buffer): %d\n", sizeof(job_buffer));
  printf("sizeof(job_queue): %d\n", sizeof(job_queue));
  printf("sizeof(thr_data): %d\n", sizeof(thr_data));
  printf("sizeof(thread_repository): %d\n", sizeof(thread_repository));
}

int main(void)
{
#if 0
  thr_wait wait;
  pthread_t thr;
  wait.wakeup();
  pthread_create(&thr, 0, run, &wait);
  wait.yield(0);

  sleep(5);
#endif
#ifdef NOCODE
  sizes();
#else
  thread_repository rep;
  thr_main(&rep, 0);

  return 0;
#endif
}

Thread
bk commit into 5.0 tree (jonas:1.2172)jonas3 Nov