Below is the list of changes that have just been committed into a local
5.0 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-11-03 23:50:33+01:00, jonas@eel.(none) +1 -0
new proof-of-concept file...
ndb/src/kernel/vm/mt.cpp@stripped, 2006-11-03 23:49:46+01:00, jonas@eel.(none) +348 -0
BitKeeper file /home/jonas/src/50-atrt/ndb/src/kernel/vm/mt.cpp
ndb/src/kernel/vm/mt.cpp@stripped, 2006-11-03 23:49:46+01:00, jonas@eel.(none) +0 -0
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: eel.(none)
# Root: /home/jonas/src/50-atrt
--- New file ---
+++ ndb/src/kernel/vm/mt.cpp 06/11/03 23:49:46
#include <sys/uio.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
#include <errno.h>
#include <assert.h>
#include <linux/futex.h>
#include <stdlib.h>
#include <string.h>
#define likely(x) x
#define unlikely(x) x
static inline
int
xcng(unsigned * addr, int val)
{
asm volatile ("xchg %0, %1;" : "+r" (val) , "+m" (*addr));
return val;
}
/* This must include the futex call */
#include <asm/unistd.h>
#define __NR_sys_futex __NR_futex
_syscall6(int,sys_futex,
unsigned *, futex,
int, op,
int, val,
const struct timespec *, rel,
int *, uaddr2,
int, val3);
static inline
int
futex_wait(unsigned * addr, int val, const struct timespec * timeout)
{
return sys_futex(addr,
FUTEX_WAIT, val, timeout, 0, 0);
}
static inline
int
futex_wake(unsigned * addr)
{
return sys_futex(addr, FUTEX_WAKE, 1, 0, 0, 0);
}
#define MAX_THREADS 63
struct thr_wait
{
unsigned m_futex_state;
enum {
FS_RUNNING = 0,
FS_SLEEPING = 1,
};
thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
};
static
void
yield(struct thr_wait* wait, const struct timespec *timeout)
{
unsigned * val = &wait->m_futex_state;
int old = xcng(val, thr_wait::FS_SLEEPING);
assert(old == thr_wait::FS_RUNNING);
int ret;
while ((ret = futex_wait(val, thr_wait::FS_SLEEPING, timeout)) == EINTR);
if (ret == ETIMEDOUT)
{
xcng(val, thr_wait::FS_RUNNING);
}
else
{
assert(xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_RUNNING);
}
}
static
int
wakeup(struct thr_wait* wait)
{
unsigned * val = &wait->m_futex_state;
if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
{
return futex_wake(val);
}
return 0;
}
extern "C"
void *
run(void * arg)
{
thr_wait* wait = (thr_wait*)arg;
sleep(1);
printf("wakeup: %d\n", wakeup(wait));
sleep(1);
}
struct signalheader
{
unsigned len;
unsigned seccount;
unsigned bno;
unsigned gsn;
unsigned header[3];
unsigned data[1]; // As specified in len
};
struct Signal
{
struct signalheader m_header;
};
struct SimulatedBlock
{
unsigned m_thr_no;
struct thr_data* m_thr_data;
struct thread_repository* m_thr_repository;
void execute(unsigned gsn, struct Signal*);
void sendsignal(Signal*, unsigned ref_thr);
};
struct globaldata
{
SimulatedBlock* get(unsigned);
};
struct job_buffer // 32k
{
unsigned m_len;
unsigned m_data[8191];
inline int insert(struct signalheader* s) {
unsigned len = m_len;
unsigned* pos = m_data + len;
unsigned siglen = s->len + s->seccount;
memcpy(pos, s, 4*siglen);
m_len = len + siglen;
return (len + siglen + 32) - 8191; // > 0 not full, <=0 full
}
unsigned dojob(struct globaldata* g, struct Signal* sig){
unsigned cnt = 0;
unsigned pos = 0;
unsigned len = m_len;
while (pos < len)
{
signalheader* s = reinterpret_cast<signalheader*>(m_data + pos);
unsigned siglen = s->len + s->seccount;
unsigned bno = s->bno;
unsigned gsn = s->gsn;
SimulatedBlock * block = g->get(bno);
memcpy(sig, s, 4*siglen);
block->execute(gsn, sig);
cnt++;
pos += siglen;
}
return cnt;
}
};
struct job_queue
{
static const unsigned SIZE = 62;
unsigned m_read_index; // used by consumer
unsigned m_write_index; // used by producer
struct job_buffer* m_buffers[SIZE];
};
struct thr_data
{
thr_wait m_waiter;
struct job_buffer* m_out_queue[MAX_THREADS];
struct job_buffer* m_free_list;
struct job_queue m_in_queue[MAX_THREADS];
};
template<typename T>
struct thr_safe_pool
{
T* seize();
};
struct thread_repository
{
int m_thread_count;
struct thr_data m_thread[MAX_THREADS];
struct thr_safe_pool<job_buffer> m_free_list;
};
#ifndef NOCODE
static
job_buffer*
seize_buffer(struct thread_repository* rep, int thr_no)
{
job_buffer* jb;
if (likely((jb = rep->m_thread[thr_no].m_free_list)))
{
job_buffer* next = reinterpret_cast<job_buffer*>(jb->m_len);
rep->m_thread[thr_no].m_free_list = next;
return jb;
}
/* global alloc */
return rep->m_free_list.seize();
}
static
void
release_buffer(struct thread_repository* rep, int thr_no, job_buffer* jb)
{
job_buffer* next = reinterpret_cast<job_buffer*>(jb->m_len);
next = rep->m_thread[thr_no].m_free_list;
rep->m_thread[thr_no].m_free_list = jb;
}
static
void
transfer_buffer(struct thread_repository* rep, int from, int to)
{
unsigned old;
unsigned * writeptr = &(rep->m_thread[to].m_in_queue[from].m_write_index);
volatile unsigned * readptr = &(rep->m_thread[to].m_in_queue[from].m_read_index);
unsigned readidx = * writeptr;
unsigned writeidx = * readptr;
unsigned nextidx = (writeidx + 1) % job_queue::SIZE;
job_buffer* src = rep->m_thread[from].m_out_queue[to];
if (unlikely(readidx == nextidx))
goto check_full;
do_transfer:
rep->m_thread[to].m_in_queue[from].m_buffers[writeidx] = src;
// need storestore barrier
// but xcng is full barrier
old = xcng(writeptr, nextidx);
assert(old == writeidx);
wakeup(&rep->m_thread[to].m_waiter); // potentially wakeup
rep->m_thread[from].m_out_queue[to] = seize_buffer(rep, from);
return ;
check_full:
for (unsigned i = 0; i<1000; i++)
if (* readptr != nextidx)
goto do_transfer;
abort();
}
void
thr_main(struct thread_repository* rep, unsigned thr_no)
{
struct Signal * signal;
while (true)
{
unsigned sum = 0;
unsigned cnt = rep->m_thread_count;
/**
* Process each inbuffer
*/
for (unsigned i = 0; i<cnt; i++)
{
unsigned ri = rep->m_thread[thr_no].m_in_queue[i].m_read_index;
unsigned wi = rep->m_thread[thr_no].m_in_queue[i].m_write_index;
job_buffer * jb = rep->m_thread[thr_no].m_in_queue[i].m_buffers[ri];
if (ri != wi)
{
ri = (ri + 1) % job_queue::SIZE;
rep->m_thread[thr_no].m_in_queue[i].m_read_index = ri;
sum += jb->dojob(0, signal);
release_buffer(rep, thr_no, jb);
}
}
if (sum)
{
/**
* Transfer all out buffers
*/
for (unsigned i = 0; i<cnt; i++)
{
job_buffer * jb = rep->m_thread[thr_no].m_out_queue[i];
if (jb->m_len)
{
transfer_buffer(rep, thr_no, i);
}
}
}
else
{
yield(&rep->m_thread[thr_no].m_waiter, 0);
}
}
}
void
SimulatedBlock::sendsignal(Signal* sig, unsigned thr_no)
{
struct thread_repository* rep = m_thr_repository;
int res = m_thr_data->m_out_queue[thr_no]->insert(&sig->m_header);
if (res <= 0)
{
transfer_buffer(rep, m_thr_no, thr_no);
}
}
#endif
void
sizes()
{
printf("sizeof(thr_wait): %d\n", sizeof(thr_wait));
printf("sizeof(job_buffer): %d\n", sizeof(job_buffer));
printf("sizeof(job_queue): %d\n", sizeof(job_queue));
printf("sizeof(thr_data): %d\n", sizeof(thr_data));
printf("sizeof(thread_repository): %d\n", sizeof(thread_repository));
}
int main(void)
{
#if 0
thr_wait wait;
pthread_t thr;
wait.wakeup();
pthread_create(&thr, 0, run, &wait);
wait.yield(0);
sleep(5);
#endif
#ifdef NOCODE
sizes();
#else
thread_repository rep;
thr_main(&rep, 0);
return 0;
#endif
}
| Thread |
|---|
| • bk commit into 5.0 tree (jonas:1.2172) | jonas | 3 Nov |