| /* Copyright (c) 2010 The Regents of the University of California |
| * Barret Rhoden <brho@cs.berkeley.edu> |
| * See LICENSE for details. |
| * |
| * Multi-producer, multi-consumer queues. Designed initially for an untrusted |
| * consumer. |
| */ |
| |
| #pragma once |
| |
| #include <ros/common.h> |
| #include <ros/bcq_struct.h> |
| #include <string.h> |
| |
| /* Pain in the ass includes. Glibc has an atomic.h, and eventually userspace |
| * will have to deal with the clobbering. */ |
| #ifdef ROS_KERNEL |
| #include <atomic.h> |
| /* dequeue uses relax_vc, which is user-only. Some kernel tests call dequeue.*/ |
| #define cpu_relax_vc(x) cpu_relax() |
| #else |
| #include <parlib/arch/atomic.h> |
| #include <parlib/vcore.h> |
| #endif /* ROS_KERNEL */ |
| |
| /* Bounded Concurrent Queues, untrusted consumer |
| * |
| * This is a producer/consumer circular buffer, in which the producer never |
| * blocks and does not need to trust the data structure (which is writable by |
| * the consumer). |
| * |
| * A producer enqueues an item, based on the indexes of the producer and |
| * consumer. Enqueue cannot block, but can fail if the queue is full or if it |
| * fails to enqueue after a certain amount of tries. |
| * |
| * prod_idx: the next item to be produced |
| * cons_pvt_idx: the next item a consumer can claim |
| * cons_pub_idx: the last item (farthest left / oldest) that hasn't been |
| * consumed/made ready to be clobbered by the producer (it is |
| * what the consumer produces). Once all are clear, this will be |
| * the same as the prod_idx. |
| * |
| * The number of free slots in the buffer is: BufSize - (prod_idx - cons_pub) |
| * The power-of-two nature of the number of elements makes this work when it |
| * wraps around, just like with Xen. Check it yourself with a counter of 8 and |
| * bufsizes of 8 and 4. |
| * |
| * |
| * General plan: |
| * |
| * Producers compete among themselves, using the prod_idx, to get a free spot. |
| * Once they have a spot, they fill in the item, and then toggle the "ready for |
| * consumption" bool for a client. If it cannot find one after a number of |
| * tries, it simply fails (could be a DoS from the client). |
| * |
| * Consumers fight with their private index, which they use to determine who is |
| * consuming which item. If there is an unconsumed item, they try to advance |
| * the pvt counter. If they succeed, they can consume the item. The item |
| * might not be there yet, so they must spin until it is there. Then, the |
| * consumer copies the item out, and clears the bool (rdy_for_cons). |
| * |
| * At this point, the consumer needs to make sure the pub_idx is advanced |
| * enough so the producer knows the item is free. If pub_idx was their item, |
| * they move it forward to the next item. If it is not, currently, they spin |
| * and wait until the previous consumer finishes, and then move it forward. |
| * This isn't ideal, and we can deal with this in the future. |
| * |
| * Enqueue will enqueue the item pointed to by elem. Dequeue will write an |
| * item into the memory pointed to by elem. |
| * |
| * The number of items must be a power of two. In the future, we'll probably |
| * use Xen's rounding macros. Not using powers of two is a pain, esp with mods |
| * of negative numbers. |
| * |
| * Here's how to use it: |
| * |
| * DEFINE_BCQ_TYPES(my_name, my_type, my_size); |
| * struct my_name_bcq some_bcq; |
| * bcq_init(&some_bcq, my_type, my_size); |
| * |
| * bcq_enqueue(&some_bcq, &some_my_type, my_size, num_fails_okay); |
| * bcq_dequeue(&some_bcq, &some_my_type, my_size); |
| * |
| * They both return 0 on success, or some error code on failure. |
| * |
| * TODO later: |
| * Automatically round up. |
| * |
| * Watch out for ABA. Could use ctrs in the top of the indexes. Not really an |
| * issue, since that would be a wraparound. |
| * |
| * Consumers could also just set their bit, and have whoever has the pub_idx |
| * right before them be the one to advance it all the way up. |
| * |
| * Using uint32_t for now, since that's the comp_and_swap we have. We'll |
| * probably get other sizes once we're sure we like the current one. */ |
| |
| #if 0 // Defined in the included header |
| |
| struct bcq_header { |
| uint32_t prod_idx; /* next to be produced in */ |
| uint32_t cons_pub_idx; /* last completely consumed */ |
| uint32_t cons_pvt_idx; /* last a consumer has dibs on */ |
| }; |
| |
| // This is there too: |
| #define DEFINE_BCQ_TYPES(__name, __elem_t, __num_elems) |
| |
| #endif |
| |
| /* Functions */ |
| #define bcq_init(_bcq, _ele_type, _num_elems) \ |
| ({ \ |
| memset((_bcq), 0, sizeof(*(_bcq))); \ |
| assert((_num_elems) == ROUNDUPPWR2(_num_elems)); \ |
| }) |
| |
| /* Num empty buffer slots in the BCQ */ |
| #define BCQ_FREE_SLOTS(_p, _cp, _ne) ((_ne) - ((_p) - (_cp))) |
| |
| /* Really empty */ |
| #define BCQ_EMPTY(_p, _cp, _ne) ((_ne) == BCQ_FREE_SLOTS(_p, _cp, _ne)) |
| |
| /* All work claimed by a consumer */ |
| #define BCQ_NO_WORK(_p, _cpv) ((_p) == (_cpv)) |
| |
| /* Buffer full */ |
| #define BCQ_FULL(_p, _cp, _ne) (0 == BCQ_FREE_SLOTS(_p, _cp, _ne)) |
| |
| /* Figure out the slot you want, bail if it's full, or if you failed too many |
| * times, CAS to set the new prod. Fill yours in, toggle the bool. Sorry, the |
| * macro got a bit ugly, esp with the __retval hackery. */ |
| #define bcq_enqueue(_bcq, _elem, _num_elems, _num_fail) \ |
| ({ \ |
| uint32_t __prod, __new_prod, __cons_pub, __failctr = 0; \ |
| int __retval = 0; \ |
| do { \ |
| cmb(); \ |
| if (((_num_fail)) && (__failctr++ >= (_num_fail))) { \ |
| __retval = -EFAIL; \ |
| break; \ |
| } \ |
| __prod = (_bcq)->hdr.prod_idx; \ |
| __cons_pub = (_bcq)->hdr.cons_pub_idx; \ |
| if (BCQ_FULL(__prod, __cons_pub, (_num_elems))) { \ |
| __retval = -EBUSY; \ |
| break; \ |
| } \ |
| __new_prod = __prod + 1; \ |
| } while (!atomic_cas_u32(&(_bcq)->hdr.prod_idx, __prod, __new_prod)); \ |
| if (!__retval) { \ |
| /* from here out, __prod is the local __prod that we won */ \ |
| (_bcq)->wraps[__prod & ((_num_elems)-1)].elem = *(_elem); \ |
| wmb(); \ |
| (_bcq)->wraps[__prod & ((_num_elems)-1)].rdy_for_cons = TRUE; \ |
| } \ |
| __retval; \ |
| }) |
| |
| /* Similar to enqueue, spin afterwards til cons_pub is our element, then |
| * advance it. */ |
| #define bcq_dequeue(_bcq, _elem, _num_elems) \ |
| ({ \ |
| uint32_t __prod, __cons_pvt, __new_cons_pvt, __cons_pub; \ |
| int __retval = 0; \ |
| do { \ |
| cmb(); \ |
| __prod = (_bcq)->hdr.prod_idx; \ |
| __cons_pvt = (_bcq)->hdr.cons_pvt_idx; \ |
| if (BCQ_NO_WORK(__prod, __cons_pvt)) { \ |
| __retval = -EBUSY; \ |
| break; \ |
| } \ |
| __new_cons_pvt = (__cons_pvt + 1); \ |
| } while (!atomic_cas_u32(&(_bcq)->hdr.cons_pvt_idx, __cons_pvt, \ |
| __new_cons_pvt)); \ |
| if (!__retval) { \ |
| /* from here out, __cons_pvt is the local __cons_pvt that we */\ |
| /* won. wait for the producer to finish copying it in */ \ |
| while (!(_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].rdy_for_cons) \ |
| cpu_relax(); \ |
| *(_elem) = (_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].elem; \ |
| (_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].rdy_for_cons = FALSE; \ |
| /* wait til we're the cons_pub, then advance it by one */ \ |
| while ((_bcq)->hdr.cons_pub_idx != __cons_pvt) \ |
| cpu_relax_vc(vcore_id()); \ |
| (_bcq)->hdr.cons_pub_idx = __cons_pvt + 1; \ |
| } \ |
| __retval; \ |
| }) |
| |
| /* Checks of a bcq is empty (meaning no work), instead of trying to dequeue */ |
| #define bcq_empty(_bcq) \ |
| BCQ_NO_WORK((_bcq)->hdr.prod_idx, (_bcq)->hdr.cons_pvt_idx) |
| |
| #define bcq_nr_full(_bcq) \ |
| ((_bcq)->hdr.prod_idx - (_bcq)->hdr.cons_pub_idx) |