blob: 1cf3bb13437f1dddcfdca54c1351b4c3d9f25c03 [file] [log] [blame]
#include <ros/common.h>
#include <futex.h>
#include <sys/queue.h>
#include <pthread.h>
#include <parlib.h>
#include <assert.h>
#include <stdio.h>
#include <errno.h>
#include <slab.h>
#include <mcs.h>
static inline int futex_wake(int *uaddr, int count);
static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout);
static void *timer_thread(void *arg);
struct futex_element {
TAILQ_ENTRY(futex_element) link;
pthread_t pthread;
int *uaddr;
uint64_t ms_timeout;
bool timedout;
};
TAILQ_HEAD(futex_queue, futex_element);
struct futex_data {
struct mcs_pdr_lock lock;
struct futex_queue queue;
int timer_enabled;
pthread_t timer;
long time;
};
static struct futex_data __futex;
static inline void futex_init()
{
mcs_pdr_init(&__futex.lock);
TAILQ_INIT(&__futex.queue);
__futex.timer_enabled = false;
pthread_create(&__futex.timer, NULL, timer_thread, NULL);
__futex.time = 0;
}
static void __futex_block(struct uthread *uthread, void *arg) {
pthread_t pthread = (pthread_t)uthread;
struct futex_element *e = (struct futex_element*)arg;
__pthread_generic_yield(pthread);
pthread->state = PTH_BLK_MUTEX;
e->pthread = pthread;
}
static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout)
{
// Atomically do the following...
mcs_pdr_lock(&__futex.lock);
// If the value of *uaddr matches val
if(*uaddr == val) {
// Create a new futex element and initialize it.
struct futex_element e;
bool enable_timer = false;
e.uaddr = uaddr;
e.pthread = NULL;
e.ms_timeout = ms_timeout;
e.timedout = false;
if(e.ms_timeout != (uint64_t)-1) {
e.ms_timeout += __futex.time;
// If we are setting the timeout, get ready to
// enable the timer if it is currently disabled.
if(__futex.timer_enabled == false) {
__futex.timer_enabled = true;
enable_timer = true;
}
}
// Insert the futex element into the queue
TAILQ_INSERT_TAIL(&__futex.queue, &e, link);
mcs_pdr_unlock(&__futex.lock);
// Enable the timer if we need to outside the lock
if(enable_timer)
futex_wake(&__futex.timer_enabled, 1);
// Yield the current uthread
uthread_yield(TRUE, __futex_block, &e);
// After waking, if we timed out, set the error
// code appropriately and return
if(e.timedout) {
errno = ETIMEDOUT;
return -1;
}
}
else {
mcs_pdr_unlock(&__futex.lock);
}
return 0;
}
static inline int futex_wake(int *uaddr, int count)
{
struct futex_element *e,*n = NULL;
struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
// Atomically grab all relevant futex blockers
// from the global futex queue
mcs_pdr_lock(&__futex.lock);
e = TAILQ_FIRST(&__futex.queue);
while(e != NULL) {
if(count > 0) {
n = TAILQ_NEXT(e, link);
if(e->uaddr == uaddr) {
TAILQ_REMOVE(&__futex.queue, e, link);
TAILQ_INSERT_TAIL(&q, e, link);
count--;
}
e = n;
}
else break;
}
mcs_pdr_unlock(&__futex.lock);
// Unblock them outside the lock
e = TAILQ_FIRST(&q);
while(e != NULL) {
n = TAILQ_NEXT(e, link);
TAILQ_REMOVE(&q, e, link);
while(e->pthread == NULL)
cpu_relax();
uthread_runnable((struct uthread*)e->pthread);
e = n;
}
return 0;
}
static void *timer_thread(void *arg)
{
struct futex_element *e,*n = NULL;
struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
// Do this forever...
for(;;) {
// Block for 1 millisecond
sys_block(1000);
// Then atomically do the following...
mcs_pdr_lock(&__futex.lock);
// Up the time
__futex.time++;
// Find all futexes that have timed out on this iteration,
// and count those still waiting
int waiting = 0;
e = TAILQ_FIRST(&__futex.queue);
while(e != NULL) {
n = TAILQ_NEXT(e, link);
if(e->ms_timeout == __futex.time) {
e->timedout = true;
TAILQ_REMOVE(&__futex.queue, e, link);
TAILQ_INSERT_TAIL(&q, e, link);
}
else if(e->ms_timeout != (uint64_t)-1)
waiting++;
e = n;
}
// If there are no more waiting, disable the timer
if(waiting == 0) {
__futex.time = 0;
__futex.timer_enabled = false;
}
mcs_pdr_unlock(&__futex.lock);
// Unblock any futexes that have timed out outside the lock
e = TAILQ_FIRST(&q);
while(e != NULL) {
n = TAILQ_NEXT(e, link);
TAILQ_REMOVE(&q, e, link);
while(e->pthread == NULL)
cpu_relax();
uthread_runnable((struct uthread*)e->pthread);
e = n;
}
// If we have disabled the timer, park this thread
futex_wait(&__futex.timer_enabled, false, -1);
}
}
int futex(int *uaddr, int op, int val,
const struct timespec *timeout,
int *uaddr2, int val3)
{
uint64_t ms_timeout = (uint64_t)-1;
assert(uaddr2 == NULL);
assert(val3 == 0);
if(timeout != NULL) {
ms_timeout = timeout->tv_sec*1000 + timeout->tv_nsec/1000000L;
assert(ms_timeout > 0);
}
run_once(futex_init());
switch(op) {
case FUTEX_WAIT:
return futex_wait(uaddr, val, ms_timeout);
case FUTEX_WAKE:
return futex_wake(uaddr, val);
default:
errno = ENOSYS;
return -1;
}
return -1;
}