blob: 5ec00f3f56f2b72828e6203c318d2178d7e2b4c5 [file] [log] [blame]
/* Copyright (c) 2010 The Regents of the University of California
* Barret Rhoden <brho@cs.berkeley.edu>
* See LICENSE for details.
*
* Multi-producer, multi-consumer queues. Designed initially for an untrusted
* consumer.
*/
#pragma once
#include <ros/common.h>
#include <ros/bcq_struct.h>
#include <string.h>
/* Pain in the ass includes. Glibc has an atomic.h, and eventually userspace
* will have to deal with the clobbering. */
#ifdef ROS_KERNEL
#include <atomic.h>
/* dequeue uses relax_vc, which is user-only. Some kernel tests call dequeue.*/
#define cpu_relax_vc(x) cpu_relax()
#else
#include <parlib/arch/atomic.h>
#include <parlib/vcore.h>
#endif /* ROS_KERNEL */
/* Bounded Concurrent Queues, untrusted consumer
*
* This is a producer/consumer circular buffer, in which the producer never
* blocks and does not need to trust the data structure (which is writable by
* the consumer).
*
* A producer enqueues an item, based on the indexes of the producer and
* consumer. Enqueue cannot block, but can fail if the queue is full or if it
* fails to enqueue after a certain amount of tries.
*
* prod_idx: the next item to be produced
* cons_pvt_idx: the next item a consumer can claim
* cons_pub_idx: the last item (farthest left / oldest) that hasn't been
* consumed/made ready to be clobbered by the producer (it is
* what the consumer produces). Once all are clear, this will be
* the same as the prod_idx.
*
* The number of free slots in the buffer is: BufSize - (prod_idx - cons_pub)
* The power-of-two nature of the number of elements makes this work when it
* wraps around, just like with Xen. Check it yourself with a counter of 8 and
* bufsizes of 8 and 4.
*
*
* General plan:
*
* Producers compete among themselves, using the prod_idx, to get a free spot.
* Once they have a spot, they fill in the item, and then toggle the "ready for
* consumption" bool for a client. If it cannot find one after a number of
* tries, it simply fails (could be a DoS from the client).
*
* Consumers fight with their private index, which they use to determine who is
* consuming which item. If there is an unconsumed item, they try to advance
* the pvt counter. If they succeed, they can consume the item. The item
* might not be there yet, so they must spin until it is there. Then, the
* consumer copies the item out, and clears the bool (rdy_for_cons).
*
* At this point, the consumer needs to make sure the pub_idx is advanced
* enough so the producer knows the item is free. If pub_idx was their item,
* they move it forward to the next item. If it is not, currently, they spin
* and wait until the previous consumer finishes, and then move it forward.
* This isn't ideal, and we can deal with this in the future.
*
* Enqueue will enqueue the item pointed to by elem. Dequeue will write an
* item into the memory pointed to by elem.
*
* The number of items must be a power of two. In the future, we'll probably
* use Xen's rounding macros. Not using powers of two is a pain, esp with mods
* of negative numbers.
*
* Here's how to use it:
*
* DEFINE_BCQ_TYPES(my_name, my_type, my_size);
* struct my_name_bcq some_bcq;
* bcq_init(&some_bcq, my_type, my_size);
*
* bcq_enqueue(&some_bcq, &some_my_type, my_size, num_fails_okay);
* bcq_dequeue(&some_bcq, &some_my_type, my_size);
*
* They both return 0 on success, or some error code on failure.
*
* TODO later:
* Automatically round up.
*
* Watch out for ABA. Could use ctrs in the top of the indexes. Not really an
* issue, since that would be a wraparound.
*
* Consumers could also just set their bit, and have whoever has the pub_idx
* right before them be the one to advance it all the way up.
*
* Using uint32_t for now, since that's the comp_and_swap we have. We'll
* probably get other sizes once we're sure we like the current one. */
#if 0 // Defined in the included header
struct bcq_header {
uint32_t prod_idx; /* next to be produced in */
uint32_t cons_pub_idx; /* last completely consumed */
uint32_t cons_pvt_idx; /* last a consumer has dibs on */
};
// This is there too:
#define DEFINE_BCQ_TYPES(__name, __elem_t, __num_elems)
#endif
/* Functions */
#define bcq_init(_bcq, _ele_type, _num_elems) \
({ \
memset((_bcq), 0, sizeof(*(_bcq))); \
assert((_num_elems) == ROUNDUPPWR2(_num_elems)); \
})
/* Num empty buffer slots in the BCQ */
#define BCQ_FREE_SLOTS(_p, _cp, _ne) ((_ne) - ((_p) - (_cp)))
/* Really empty */
#define BCQ_EMPTY(_p, _cp, _ne) ((_ne) == BCQ_FREE_SLOTS(_p, _cp, _ne))
/* All work claimed by a consumer */
#define BCQ_NO_WORK(_p, _cpv) ((_p) == (_cpv))
/* Buffer full */
#define BCQ_FULL(_p, _cp, _ne) (0 == BCQ_FREE_SLOTS(_p, _cp, _ne))
/* Figure out the slot you want, bail if it's full, or if you failed too many
* times, CAS to set the new prod. Fill yours in, toggle the bool. Sorry, the
* macro got a bit ugly, esp with the __retval hackery. */
#define bcq_enqueue(_bcq, _elem, _num_elems, _num_fail) \
({ \
uint32_t __prod, __new_prod, __cons_pub, __failctr = 0; \
int __retval = 0; \
do { \
cmb(); \
if (((_num_fail)) && (__failctr++ >= (_num_fail))) { \
__retval = -EFAIL; \
break; \
} \
__prod = (_bcq)->hdr.prod_idx; \
__cons_pub = (_bcq)->hdr.cons_pub_idx; \
if (BCQ_FULL(__prod, __cons_pub, (_num_elems))) { \
__retval = -EBUSY; \
break; \
} \
__new_prod = __prod + 1; \
} while (!atomic_cas_u32(&(_bcq)->hdr.prod_idx, __prod, __new_prod)); \
if (!__retval) { \
/* from here out, __prod is the local __prod that we won */ \
(_bcq)->wraps[__prod & ((_num_elems)-1)].elem = *(_elem); \
wmb(); \
(_bcq)->wraps[__prod & ((_num_elems)-1)].rdy_for_cons = TRUE; \
} \
__retval; \
})
/* Similar to enqueue, spin afterwards til cons_pub is our element, then
* advance it. */
#define bcq_dequeue(_bcq, _elem, _num_elems) \
({ \
uint32_t __prod, __cons_pvt, __new_cons_pvt, __cons_pub; \
int __retval = 0; \
do { \
cmb(); \
__prod = (_bcq)->hdr.prod_idx; \
__cons_pvt = (_bcq)->hdr.cons_pvt_idx; \
if (BCQ_NO_WORK(__prod, __cons_pvt)) { \
__retval = -EBUSY; \
break; \
} \
__new_cons_pvt = (__cons_pvt + 1); \
} while (!atomic_cas_u32(&(_bcq)->hdr.cons_pvt_idx, __cons_pvt, \
__new_cons_pvt)); \
if (!__retval) { \
/* from here out, __cons_pvt is the local __cons_pvt that we */\
/* won. wait for the producer to finish copying it in */ \
while (!(_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].rdy_for_cons) \
cpu_relax(); \
*(_elem) = (_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].elem; \
(_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].rdy_for_cons = FALSE; \
/* wait til we're the cons_pub, then advance it by one */ \
while ((_bcq)->hdr.cons_pub_idx != __cons_pvt) \
cpu_relax_vc(vcore_id()); \
(_bcq)->hdr.cons_pub_idx = __cons_pvt + 1; \
} \
__retval; \
})
/* Checks of a bcq is empty (meaning no work), instead of trying to dequeue */
#define bcq_empty(_bcq) \
BCQ_NO_WORK((_bcq)->hdr.prod_idx, (_bcq)->hdr.cons_pvt_idx)
#define bcq_nr_full(_bcq) \
((_bcq)->hdr.prod_idx - (_bcq)->hdr.cons_pub_idx)