blob: b439fc0e48b3205ec2caa049d15233ded1b564d1 [file] [log] [blame]
/* Copyright (c) 2011 The Regents of the University of California
* Barret Rhoden <brho@cs.berkeley.edu>
* See LICENSE for details.
*
* Unbounded concurrent queues, user side. Check k/i/r/ucq.h or the
* Documentation for more info. */
#include <ros/arch/membar.h>
#include <parlib/arch/atomic.h>
#include <parlib/arch/arch.h>
#include <parlib/ucq.h>
#include <parlib/spinlock.h>
#include <sys/mman.h>
#include <parlib/assert.h>
#include <parlib/stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <parlib/vcore.h>
#include <parlib/ros_debug.h> /* for printd() */
/* Initializes a ucq. You pass in addresses of mmaped pages for the main page
* (prod_idx) and the spare page. I recommend mmaping a big chunk and breaking
* it up over a bunch of ucqs, instead of doing a lot of little mmap() calls. */
void ucq_init_raw(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2)
{
printd("[user] initializing ucq %08p for proc %d\n", ucq, getpid());
assert(!PGOFF(pg1));
assert(!PGOFF(pg2));
/* Prod and cons both start on the first page, slot 0. When they are
* equal, the ucq is empty. */
atomic_set(&ucq->prod_idx, pg1);
atomic_set(&ucq->cons_idx, pg1);
ucq->prod_overflow = FALSE;
atomic_set(&ucq->nr_extra_pgs, 0);
atomic_set(&ucq->spare_pg, pg2);
parlib_static_assert(sizeof(struct spin_pdr_lock) <=
sizeof(ucq->u_lock));
spin_pdr_init((struct spin_pdr_lock*)(&ucq->u_lock));
ucq->ucq_ready = TRUE;
}
/* Inits a ucq, where you don't have to bother with the memory allocation. This
* would be appropriate for one or two UCQs, though if you're allocating in
* bulk, use the raw version. */
void ucq_init(struct ucq *ucq)
{
uintptr_t two_pages = (uintptr_t)mmap(0, PGSIZE * 2,
PROT_WRITE | PROT_READ,
MAP_POPULATE | MAP_ANONYMOUS |
MAP_PRIVATE, -1, 0);
assert(two_pages);
ucq_init_raw(ucq, two_pages, two_pages + PGSIZE);
}
/* Only call this on ucq's made with the simple ucq_init(). And be sure the ucq
* is no longer in use. */
void ucq_free_pgs(struct ucq *ucq)
{
uintptr_t pg1 = atomic_read(&ucq->prod_idx);
uintptr_t pg2 = atomic_read(&ucq->spare_pg);
assert(pg1 && pg2);
munmap((void*)pg1, PGSIZE);
munmap((void*)pg2, PGSIZE);
}
/* Consumer side, returns TRUE on success and fills *msg with the ev_msg. If
* the ucq appears empty, it will return FALSE. Messages may have arrived after
* we started getting that we do not receive. */
bool get_ucq_msg(struct ucq *ucq, struct event_msg *msg)
{
uintptr_t my_idx;
struct ucq_page *old_page, *other_page;
struct msg_container *my_msg;
struct spin_pdr_lock *ucq_lock = (struct spin_pdr_lock*)(&ucq->u_lock);
do {
loop_top:
cmb();
my_idx = atomic_read(&ucq->cons_idx);
/* The ucq is empty if the consumer and producer are on the same
* 'next' slot. */
if (my_idx == atomic_read(&ucq->prod_idx))
return FALSE;
/* Is the slot we want good? If not, we're going to need to try
* and move on to the next page. If it is, we bypass all of
* this and try to CAS on us getting my_idx. */
if (slot_is_good(my_idx))
goto claim_slot;
/* Slot is bad, let's try and fix it */
spin_pdr_lock(ucq_lock);
/* Reread the idx, in case someone else fixed things up while we
* were waiting/fighting for the lock */
my_idx = atomic_read(&ucq->cons_idx);
if (slot_is_good(my_idx)) {
/* Someone else fixed it already, let's just try to get
* out */
spin_pdr_unlock(ucq_lock);
/* Make sure this new slot has a producer (ucq isn't
* empty) */
if (my_idx == atomic_read(&ucq->prod_idx))
return FALSE;
goto claim_slot;
}
/* At this point, the slot is bad, and all other possible
* consumers are spinning on the lock. Time to fix things up:
* Set the counter to the next page, and free the old one. */
/* First, we need to wait and make sure the kernel has posted
* the next page. Worst case, we know that the kernel is
* working on it, since prod_idx != cons_idx */
old_page = (struct ucq_page*)PTE_ADDR(my_idx);
while (!old_page->header.cons_next_pg)
cpu_relax();
/* Now set the counter to the next page */
assert(!PGOFF(old_page->header.cons_next_pg));
atomic_set(&ucq->cons_idx, old_page->header.cons_next_pg);
/* Side note: at this point, any *new* consumers coming in will
* grab slots based off the new counter index (cons_idx) */
/* Now free up the old page. Need to make sure all other
* consumers are done. We spin til enough are done, like an
* inverted refcnt. */
while (atomic_read(&old_page->header.nr_cons) < NR_MSG_PER_PAGE)
{
/* spinning on userspace here, specifically, another
* vcore and we don't know who it is. This will spin a
* bit, then make sure they aren't preeempted */
cpu_relax_any();
}
/* Now the page is done. 0 its metadata and give it up. */
old_page->header.cons_next_pg = 0;
atomic_set(&old_page->header.nr_cons, 0);
/* We want to "free" the page. We'll try and set it as the
* spare. If there is already a spare, we'll free that one. */
other_page = (struct ucq_page*)atomic_swap(&ucq->spare_pg,
(long)old_page);
assert(!PGOFF(other_page));
if (other_page) {
munmap(other_page, PGSIZE);
atomic_dec(&ucq->nr_extra_pgs);
}
/* All fixed up, unlock. Other consumers may lock and check to
* make sure things are done. */
spin_pdr_unlock(ucq_lock);
/* Now that everything is fixed, try again from the top */
goto loop_top;
claim_slot:
cmb(); /* so we can goto claim_slot */
/* If we're still here, my_idx is good, and we'll try to claim
* it. If we fail, we need to repeat the whole process. */
} while (!atomic_cas(&ucq->cons_idx, my_idx, my_idx + 1));
assert(slot_is_good(my_idx));
/* Now we have a good slot that we can consume */
my_msg = slot2msg(my_idx);
/* linux would put an rmb_depends() here */
/* Wait til the msg is ready (kernel sets this flag) */
while (!my_msg->ready)
cpu_relax();
rmb(); /* order the ready read before the contents */
/* Copy out */
*msg = my_msg->ev_msg;
/* Unset this for the next usage of the container */
my_msg->ready = FALSE;
wmb(); /* post the ready write before incrementing */
/* Increment nr_cons, showing we're done */
atomic_inc(&((struct ucq_page*)PTE_ADDR(my_idx))->header.nr_cons);
return TRUE;
}
bool ucq_is_empty(struct ucq *ucq)
{
/* The ucq is empty if the consumer and producer are on the same 'next'
* slot. */
return (atomic_read(&ucq->cons_idx) == atomic_read(&ucq->prod_idx));
}