|  | /* Copyright (c) 2015 Google Inc. | 
|  | * Barret Rhoden <brho@cs.berkeley.edu> | 
|  | * See LICENSE for details. | 
|  | * | 
|  | * Coalescing Event Queue: encapuslates the essence of epoll/kqueue in shared | 
|  | * memory: a dense array of sticky status bits. | 
|  | * | 
|  | * User side (consumer). | 
|  | * | 
|  | * When initializing, the nr_events is the maximum count of events you are | 
|  | * tracking, e.g. 100 FDs being tapped, but not the actual FD numbers.  If you | 
|  | * use a large number, don't worry about memory; the memory is reserved but only | 
|  | * allocated on demand (i.e. mmap without MAP_POPULATE). | 
|  | * | 
|  | * The ring_sz is a rough guess of the number of concurrent events.  It's not a | 
|  | * big deal what you pick, but it must be a power of 2.  Otherwise the kernel | 
|  | * will probably scribble over your memory.  If you pick a value that is too | 
|  | * small, then the ring may overflow, triggering an O(n) scan of the events | 
|  | * array (where n is the largest event ID ever seen).  You could make it the | 
|  | * nearest power of 2 >= nr_expected_events, for reasonable behavior at the | 
|  | * expense of memory.  It'll be very rare for the ring to have more entries than | 
|  | * the array has events. */ | 
|  |  | 
|  | #include <parlib/ceq.h> | 
|  | #include <parlib/arch/atomic.h> | 
|  | #include <parlib/vcore.h> | 
|  | #include <parlib/assert.h> | 
|  | #include <parlib/spinlock.h> | 
|  | #include <parlib/stdio.h> | 
|  | #include <stdlib.h> | 
|  | #include <sys/mman.h> | 
|  |  | 
|  | void ceq_init(struct ceq *ceq, uint8_t op, unsigned int nr_events, | 
|  | size_t ring_sz) | 
|  | { | 
|  | /* In case they already had an mbox initialized, cleanup whatever was | 
|  | * there so we don't leak memory.  They better not have asked for events | 
|  | * before doing this init call... */ | 
|  | ceq_cleanup(ceq); | 
|  | ceq->events = mmap(NULL, sizeof(struct ceq_event) * nr_events, | 
|  | PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, | 
|  | -1, 0); | 
|  | parlib_assert_perror(ceq->events != MAP_FAILED); | 
|  | ceq->nr_events = nr_events; | 
|  | atomic_init(&ceq->max_event_ever, 0); | 
|  | assert(IS_PWR2(ring_sz)); | 
|  | ceq->ring = malloc(sizeof(int32_t) * ring_sz); | 
|  | memset(ceq->ring, 0xff, sizeof(int32_t) * ring_sz); | 
|  | ceq->ring_sz = ring_sz; | 
|  | ceq->operation = op; | 
|  | ceq->ring_overflowed = FALSE; | 
|  | atomic_init(&ceq->prod_idx, 0); | 
|  | atomic_init(&ceq->cons_pub_idx, 0); | 
|  | atomic_init(&ceq->cons_pvt_idx, 0); | 
|  | parlib_static_assert(sizeof(struct spin_pdr_lock) <= | 
|  | sizeof(ceq->u_lock)); | 
|  | spin_pdr_init((struct spin_pdr_lock*)&ceq->u_lock); | 
|  | } | 
|  |  | 
|  | /* Helper, returns an index into the events array from the ceq ring.  -1 if the | 
|  | * ring was empty when we looked (could be filled right after we looked).  This | 
|  | * is the same algorithm used with BCQs, but with a magic value (-1) instead of | 
|  | * a bool to track whether or not the slot is ready for consumption. */ | 
|  | static int32_t get_ring_idx(struct ceq *ceq) | 
|  | { | 
|  | long pvt_idx, prod_idx; | 
|  | int32_t ret; | 
|  |  | 
|  | do { | 
|  | prod_idx = atomic_read(&ceq->prod_idx); | 
|  | pvt_idx = atomic_read(&ceq->cons_pvt_idx); | 
|  | if (__ring_empty(prod_idx, pvt_idx)) | 
|  | return -1; | 
|  | } while (!atomic_cas(&ceq->cons_pvt_idx, pvt_idx, pvt_idx + 1)); | 
|  | /* We claimed our slot, which is pvt_idx.  The new cons_pvt_idx is | 
|  | * advanced by 1 for the next consumer.  Now we need to wait on the | 
|  | * kernel to fill the value: */ | 
|  | while ((ret = ceq->ring[pvt_idx & (ceq->ring_sz - 1)]) == -1) | 
|  | cpu_relax(); | 
|  | /* Set the value back to -1 for the next time the slot is used */ | 
|  | ceq->ring[pvt_idx & (ceq->ring_sz - 1)] = -1; | 
|  | /* We now have our entry.  We need to make sure the pub_idx is updated. | 
|  | * All consumers are doing this.  We can just wait on all of them to | 
|  | * update the cons_pub to our location, then we update it to the next. | 
|  | * | 
|  | * We're waiting on other vcores, but we don't know which one(s). */ | 
|  | while (atomic_read(&ceq->cons_pub_idx) != pvt_idx) | 
|  | cpu_relax_any(); | 
|  | /* This is the only time we update cons_pub.  We also know no one else | 
|  | * is updating it at this moment; the while loop acts as a lock, such | 
|  | * that no one gets to this point until pub == their pvt_idx, all of | 
|  | * which are unique. */ | 
|  | /* No rwmb needed, it's the same variable (con_pub) */ | 
|  | atomic_set(&ceq->cons_pub_idx, pvt_idx + 1); | 
|  | return ret; | 
|  | } | 
|  |  | 
|  | /* Helper, extracts a message from a ceq[idx], returning TRUE if there was a | 
|  | * message.  Note that there might have been nothing in the message (coal == 0). | 
|  | * still, that counts; it's more about idx_posted.  A concurrent reader could | 
|  | * have swapped out the coal contents (imagine two consumers, each gets past the | 
|  | * idx_posted check).  If having an "empty" coal is a problem, then higher level | 
|  | * software can ask for another event. | 
|  | * | 
|  | * Implied in all of that is that idx_posted is also racy.  The consumer blindly | 
|  | * sets it to false.  So long as it extracts coal after doing so, we're fine. */ | 
|  | static bool extract_ceq_msg(struct ceq *ceq, int32_t idx, struct event_msg *msg) | 
|  | { | 
|  | struct ceq_event *ceq_ev = &ceq->events[idx]; | 
|  |  | 
|  | if (!ceq_ev->idx_posted) | 
|  | return FALSE; | 
|  | /* Once we clear this flag, any new coalesces will trigger another ring | 
|  | * event, so we don't need to worry about missing anything.  It is | 
|  | * possible that this CEQ event will get those new coalesces as part of | 
|  | * this message, and future messages will have nothing.  That's fine. */ | 
|  | ceq_ev->idx_posted = FALSE; | 
|  | cmb();	/* order the read after the flag write.  swap provides cpu_mb */ | 
|  | /* We extract the existing coals and reset the collection to 0; now the | 
|  | * collected events are in our msg. */ | 
|  | msg->ev_arg2 = atomic_swap(&ceq_ev->coalesce, 0); | 
|  | /* if the user wants access to user_data, they can peak in the event | 
|  | * array via ceq->events[msg->ev_type].user_data. */ | 
|  | msg->ev_type = idx; | 
|  | msg->ev_arg3 = (void*)ceq_ev->blob_data; | 
|  | ceq_ev->blob_data = 0;	/* racy, but there are no blob guarantees */ | 
|  | return TRUE; | 
|  | } | 
|  |  | 
|  | /* Consumer side, returns TRUE on success and fills *msg with the ev_msg.  If | 
|  | * the ceq appears empty, it will return FALSE.  Messages may have arrived after | 
|  | * we started getting that we do not receive. */ | 
|  | bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg) | 
|  | { | 
|  | int32_t idx = get_ring_idx(ceq); | 
|  |  | 
|  | if (idx == -1) { | 
|  | /* We didn't get anything via the ring, but if we're overflowed, | 
|  | * then we need to look in the array directly.  Note that we | 
|  | * only handle overflow when we failed to get something. | 
|  | * Eventually, we'll deal with overflow (which should be very | 
|  | * rare).  Also note that while we are dealing with overflow, | 
|  | * the kernel could be producing and using the ring, and we | 
|  | * could have consumers consuming from the ring. | 
|  | * | 
|  | * Overall, we need to clear the overflow flag, then check every | 
|  | * event.  If we find an event, we need to make sure the *next* | 
|  | * consumer continues our recovery, hence the overflow_recovery | 
|  | * field.  We could do the check for recovery immediately, but | 
|  | * that adds complexity and there's no stated guarantee of CEQ | 
|  | * message ordering (you don't have it with the ring, either, | 
|  | * technically (consider a coalesce)).  So we're fine by having | 
|  | * *a* consumer finish the recovery, but not necesarily the | 
|  | * *next* consumer.  So long as no one thinks the CEQ is empty | 
|  | * when there actually are old messages, then we're okay. */ | 
|  | if (!ceq->ring_overflowed && !ceq->overflow_recovery) | 
|  | return FALSE; | 
|  | spin_pdr_lock((struct spin_pdr_lock*)&ceq->u_lock); | 
|  | if (!ceq->overflow_recovery) { | 
|  | ceq->overflow_recovery = TRUE; | 
|  | /* set recovery before clearing overflow */ | 
|  | wmb(); | 
|  | ceq->ring_overflowed = FALSE; | 
|  | ceq->last_recovered = 0; | 
|  | /* clear overflowed before reading event entries */ | 
|  | wrmb(); | 
|  | } | 
|  | for (int i = ceq->last_recovered; | 
|  | i <= atomic_read(&ceq->max_event_ever); | 
|  | i++) { | 
|  | /* Regardles of whether there's a msg here, we checked | 
|  | * it. */ | 
|  | ceq->last_recovered++; | 
|  | if (extract_ceq_msg(ceq, i, msg)) { | 
|  | /* We found something.  There might be more, but | 
|  | * a future consumer will have to deal with it, | 
|  | * or verify there isn't. */ | 
|  | spin_pdr_unlock( | 
|  | (struct spin_pdr_lock*)&ceq->u_lock); | 
|  | return TRUE; | 
|  | } | 
|  | } | 
|  | ceq->overflow_recovery = FALSE; | 
|  | /* made it to the end, looks like there was no overflow left. | 
|  | * there could be new ones added behind us (they'd be in the | 
|  | * ring or overflow would be turned on again), but those message | 
|  | * were added after we started consuming, and therefore not our | 
|  | * obligation to extract. */ | 
|  | spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock); | 
|  | return FALSE; | 
|  | } | 
|  | if (!extract_ceq_msg(ceq, idx, msg)) | 
|  | return FALSE; | 
|  | return TRUE; | 
|  | } | 
|  |  | 
|  | /* pvt_idx is the next slot that a new consumer will try to consume.  when | 
|  | * pvt_idx != pub_idx, pub_idx is lagging, and it represents consumptions in | 
|  | * progress. */ | 
|  | static bool __ceq_ring_is_empty(struct ceq *ceq) | 
|  | { | 
|  | return __ring_empty(atomic_read(&ceq->prod_idx), | 
|  | atomic_read(&ceq->cons_pvt_idx)); | 
|  | } | 
|  |  | 
|  | bool ceq_is_empty(struct ceq *ceq) | 
|  | { | 
|  | if (!__ceq_ring_is_empty(ceq) || | 
|  | ceq->ring_overflowed || | 
|  | spin_pdr_locked((struct spin_pdr_lock*)&ceq->u_lock)) { | 
|  | return FALSE; | 
|  | } | 
|  | return TRUE; | 
|  | } | 
|  |  | 
|  | void ceq_cleanup(struct ceq *ceq) | 
|  | { | 
|  | munmap(ceq->events, sizeof(struct ceq_event) * ceq->nr_events); | 
|  | free(ceq->ring); | 
|  | } |