| #include <ros/common.h> |
| #include <futex.h> |
| #include <sys/queue.h> |
| #include <pthread.h> |
| #include <parlib.h> |
| #include <assert.h> |
| #include <stdio.h> |
| #include <errno.h> |
| #include <slab.h> |
| #include <mcs.h> |
| #include <alarm.h> |
| |
| static inline int futex_wake(int *uaddr, int count); |
| static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout); |
| static void *timer_thread(void *arg); |
| |
| struct futex_element { |
| TAILQ_ENTRY(futex_element) link; |
| pthread_t pthread; |
| int *uaddr; |
| uint64_t us_timeout; |
| struct alarm_waiter awaiter; |
| bool timedout; |
| }; |
| TAILQ_HEAD(futex_queue, futex_element); |
| |
| struct futex_data { |
| struct mcs_pdr_lock lock; |
| struct futex_queue queue; |
| }; |
| static struct futex_data __futex; |
| |
| static inline void futex_init() |
| { |
| mcs_pdr_init(&__futex.lock); |
| TAILQ_INIT(&__futex.queue); |
| } |
| |
| static void __futex_timeout(struct alarm_waiter *awaiter) { |
| struct futex_element *__e = NULL; |
| struct futex_element *e = (struct futex_element*)awaiter->data; |
| //printf("timeout fired: %p\n", e->uaddr); |
| |
| // Atomically remove the timed-out element from the futex queue if we won the |
| // race against actually completing. |
| mcs_pdr_lock(&__futex.lock); |
| TAILQ_FOREACH(__e, &__futex.queue, link) |
| if (__e == e) break; |
| if (__e != NULL) |
| TAILQ_REMOVE(&__futex.queue, e, link); |
| mcs_pdr_unlock(&__futex.lock); |
| |
| // If we removed it, restart it outside the lock |
| if (__e != NULL) { |
| e->timedout = true; |
| //printf("timeout: %p\n", e->uaddr); |
| uthread_runnable((struct uthread*)e->pthread); |
| } |
| // Set this as the very last thing we do whether we successfully woke the |
| // thread blocked on the futex or not. Either we set this or wake() sets |
| // this, not both. Spin on this in the bottom-half of the wait() code to |
| // ensure there are no more references to awaiter before freeing the memory |
| // for it. |
| e->awaiter.data = NULL; |
| } |
| |
| static void __futex_block(struct uthread *uthread, void *arg) { |
| pthread_t pthread = (pthread_t)uthread; |
| struct futex_element *e = (struct futex_element*)arg; |
| |
| // Set the remaining properties of the futex element |
| e->pthread = pthread; |
| e->timedout = false; |
| |
| // Insert the futex element into the queue |
| TAILQ_INSERT_TAIL(&__futex.queue, e, link); |
| |
| // Set an alarm for the futex timeout if applicable |
| if(e->us_timeout != (uint64_t)-1) { |
| e->awaiter.data = e; |
| init_awaiter(&e->awaiter, __futex_timeout); |
| set_awaiter_rel(&e->awaiter, e->us_timeout); |
| //printf("timeout set: %p\n", e->uaddr); |
| set_alarm(&e->awaiter); |
| } |
| |
| // Notify the scheduler of the type of yield we did |
| __pthread_generic_yield(pthread); |
| pthread->state = PTH_BLK_MUTEX; |
| |
| // Unlock the pdr_lock |
| mcs_pdr_unlock(&__futex.lock); |
| } |
| |
| static inline int futex_wait(int *uaddr, int val, uint64_t us_timeout) |
| { |
| // Atomically do the following... |
| mcs_pdr_lock(&__futex.lock); |
| // If the value of *uaddr matches val |
| if(*uaddr == val) { |
| //printf("wait: %p, %d\n", uaddr, us_timeout); |
| // Create a new futex element and initialize it. |
| struct futex_element e; |
| e.uaddr = uaddr; |
| e.us_timeout = us_timeout; |
| // Yield the uthread... |
| // We set the remaining properties of the futex element, set the timeout |
| // timer, and unlock the pdr lock on the other side. It is important that |
| // we do the unlock on the other side, because (unlike linux, etc.) its |
| // possible to get interrupted and drop into vcore context right after |
| // releasing the lock. If that vcore code then calls futex_wake(), we |
| // would be screwed. Doing things this way means we have to hold the lock |
| // longer, but its necessary for correctness. |
| uthread_yield(TRUE, __futex_block, &e); |
| // We are unlocked here! |
| |
| // If this futex had a timeout, spin briefly to make sure that all |
| // references to e are gone between the wake() and the timeout() code. We |
| // use e.awaiter.data to do this. |
| if(e.us_timeout != (uint64_t)-1) |
| while (e.awaiter.data != NULL) |
| cpu_relax(); |
| |
| // After waking, if we timed out, set the error |
| // code appropriately and return |
| if(e.timedout) { |
| errno = ETIMEDOUT; |
| return -1; |
| } |
| } else { |
| mcs_pdr_unlock(&__futex.lock); |
| } |
| return 0; |
| } |
| |
| static inline int futex_wake(int *uaddr, int count) |
| { |
| int max = count; |
| struct futex_element *e,*n = NULL; |
| struct futex_queue q = TAILQ_HEAD_INITIALIZER(q); |
| |
| // Atomically grab all relevant futex blockers |
| // from the global futex queue |
| mcs_pdr_lock(&__futex.lock); |
| e = TAILQ_FIRST(&__futex.queue); |
| while(e != NULL) { |
| if(count > 0) { |
| n = TAILQ_NEXT(e, link); |
| if(e->uaddr == uaddr) { |
| TAILQ_REMOVE(&__futex.queue, e, link); |
| TAILQ_INSERT_TAIL(&q, e, link); |
| count--; |
| } |
| e = n; |
| } |
| else break; |
| } |
| mcs_pdr_unlock(&__futex.lock); |
| |
| // Unblock them outside the lock |
| e = TAILQ_FIRST(&q); |
| while(e != NULL) { |
| n = TAILQ_NEXT(e, link); |
| TAILQ_REMOVE(&q, e, link); |
| // Cancel the timeout if one was set |
| if(e->us_timeout != (uint64_t)-1) { |
| // Try and unset the alarm. If this fails, then we have already |
| // started running the alarm callback. If it succeeds, then we can |
| // set awaiter->data to NULL so that the bottom half of wake can |
| // proceed. Either we set awaiter->data to NULL or __futex_timeout |
| // does. The fact that we made it here though, means that WE are the |
| // one who removed e from the queue, so we are basically just |
| // deciding who should set awaiter->data to NULL to indicate that |
| // there are no more references to it. |
| if(unset_alarm(&e->awaiter)) { |
| //printf("timeout canceled: %p\n", e->uaddr); |
| e->awaiter.data = NULL; |
| } |
| } |
| //printf("wake: %p\n", uaddr); |
| uthread_runnable((struct uthread*)e->pthread); |
| e = n; |
| } |
| return max-count; |
| } |
| |
| int futex(int *uaddr, int op, int val, |
| const struct timespec *timeout, |
| int *uaddr2, int val3) |
| { |
| // Round to the nearest micro-second |
| uint64_t us_timeout = (uint64_t)-1; |
| assert(uaddr2 == NULL); |
| assert(val3 == 0); |
| if(timeout != NULL) { |
| us_timeout = timeout->tv_sec*1000000L + timeout->tv_nsec/1000L; |
| assert(us_timeout > 0); |
| } |
| |
| run_once(futex_init()); |
| switch(op) { |
| case FUTEX_WAIT: |
| return futex_wait(uaddr, val, us_timeout); |
| case FUTEX_WAKE: |
| return futex_wake(uaddr, val); |
| default: |
| errno = ENOSYS; |
| return -1; |
| } |
| return -1; |
| } |
| |