blob: cdc9d49a0dd5ce235006017d152ac5d3b17c33a4 [file] [log] [blame]
/* Copyright (c) 2015 Google Inc.
* Barret Rhoden <brho@cs.berkeley.edu>
* See LICENSE for details.
*
* Coalescing Event Queue: encapuslates the essence of epoll/kqueue in shared
* memory: a dense array of sticky status bits.
*
* User side (consumer).
*
* When initializing, the nr_events is the maximum count of events you are
* tracking, e.g. 100 FDs being tapped, but not the actual FD numbers. If you
* use a large number, don't worry about memory; the memory is reserved but only
* allocated on demand (i.e. mmap without MAP_POPULATE).
*
* The ring_sz is a rough guess of the number of concurrent events. It's not a
* big deal what you pick, but it must be a power of 2. Otherwise the kernel
* will probably scribble over your memory. If you pick a value that is too
* small, then the ring may overflow, triggering an O(n) scan of the events
* array (where n is the largest event ID ever seen). You could make it the
* nearest power of 2 >= nr_expected_events, for reasonable behavior at the
* expense of memory. It'll be very rare for the ring to have more entries than
* the array has events. */
#include <parlib/ceq.h>
#include <parlib/arch/atomic.h>
#include <parlib/vcore.h>
#include <parlib/assert.h>
#include <parlib/spinlock.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/mman.h>
void ceq_init(struct ceq *ceq, uint8_t op, unsigned int nr_events,
size_t ring_sz)
{
/* In case they already had an mbox initialized, cleanup whatever was
* there so we don't leak memory. They better not have asked for events
* before doing this init call... */
ceq_cleanup(ceq);
ceq->events = mmap(NULL, sizeof(struct ceq_event) * nr_events,
PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS,
-1, 0);
parlib_assert_perror(ceq->events != MAP_FAILED);
ceq->nr_events = nr_events;
atomic_init(&ceq->max_event_ever, 0);
assert(IS_PWR2(ring_sz));
ceq->ring = malloc(sizeof(int32_t) * ring_sz);
memset(ceq->ring, 0xff, sizeof(int32_t) * ring_sz);
ceq->ring_sz = ring_sz;
ceq->operation = op;
ceq->ring_overflowed = FALSE;
atomic_init(&ceq->prod_idx, 0);
atomic_init(&ceq->cons_pub_idx, 0);
atomic_init(&ceq->cons_pvt_idx, 0);
parlib_static_assert(sizeof(struct spin_pdr_lock) <=
sizeof(ceq->u_lock));
spin_pdr_init((struct spin_pdr_lock*)&ceq->u_lock);
}
/* Helper, returns an index into the events array from the ceq ring. -1 if the
* ring was empty when we looked (could be filled right after we looked). This
* is the same algorithm used with BCQs, but with a magic value (-1) instead of
* a bool to track whether or not the slot is ready for consumption. */
static int32_t get_ring_idx(struct ceq *ceq)
{
long pvt_idx, prod_idx;
int32_t ret;
do {
prod_idx = atomic_read(&ceq->prod_idx);
pvt_idx = atomic_read(&ceq->cons_pvt_idx);
if (__ring_empty(prod_idx, pvt_idx))
return -1;
} while (!atomic_cas(&ceq->cons_pvt_idx, pvt_idx, pvt_idx + 1));
/* We claimed our slot, which is pvt_idx. The new cons_pvt_idx is
* advanced by 1 for the next consumer. Now we need to wait on the
* kernel to fill the value: */
while ((ret = ceq->ring[pvt_idx & (ceq->ring_sz - 1)]) == -1)
cpu_relax();
/* Set the value back to -1 for the next time the slot is used */
ceq->ring[pvt_idx & (ceq->ring_sz - 1)] = -1;
/* We now have our entry. We need to make sure the pub_idx is updated.
* All consumers are doing this. We can just wait on all of them to
* update the cons_pub to our location, then we update it to the next.
*
* We're waiting on other vcores, but we don't know which one(s). */
while (atomic_read(&ceq->cons_pub_idx) != pvt_idx)
cpu_relax_any();
/* This is the only time we update cons_pub. We also know no one else
* is updating it at this moment; the while loop acts as a lock, such
* that no one gets to this point until pub == their pvt_idx, all of
* which are unique. */
/* No rwmb needed, it's the same variable (con_pub) */
atomic_set(&ceq->cons_pub_idx, pvt_idx + 1);
return ret;
}
/* Helper, extracts a message from a ceq[idx], returning TRUE if there was a
* message. Note that there might have been nothing in the message (coal == 0).
* still, that counts; it's more about idx_posted. A concurrent reader could
* have swapped out the coal contents (imagine two consumers, each gets past the
* idx_posted check). If having an "empty" coal is a problem, then higher level
* software can ask for another event.
*
* Implied in all of that is that idx_posted is also racy. The consumer blindly
* sets it to false. So long as it extracts coal after doing so, we're fine. */
static bool extract_ceq_msg(struct ceq *ceq, int32_t idx, struct event_msg *msg)
{
struct ceq_event *ceq_ev = &ceq->events[idx];
if (!ceq_ev->idx_posted)
return FALSE;
/* Once we clear this flag, any new coalesces will trigger another ring
* event, so we don't need to worry about missing anything. It is
* possible that this CEQ event will get those new coalesces as part of
* this message, and future messages will have nothing. That's fine. */
ceq_ev->idx_posted = FALSE;
cmb(); /* order the read after the flag write. swap provides cpu_mb */
/* We extract the existing coals and reset the collection to 0; now the
* collected events are in our msg. */
msg->ev_arg2 = atomic_swap(&ceq_ev->coalesce, 0);
/* if the user wants access to user_data, they can peak in the event
* array via ceq->events[msg->ev_type].user_data. */
msg->ev_type = idx;
msg->ev_arg3 = (void*)ceq_ev->blob_data;
ceq_ev->blob_data = 0; /* racy, but there are no blob guarantees */
return TRUE;
}
/* Consumer side, returns TRUE on success and fills *msg with the ev_msg. If
* the ceq appears empty, it will return FALSE. Messages may have arrived after
* we started getting that we do not receive. */
bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg)
{
int32_t idx = get_ring_idx(ceq);
if (idx == -1) {
/* We didn't get anything via the ring, but if we're overflowed,
* then we need to look in the array directly. Note that we
* only handle overflow when we failed to get something.
* Eventually, we'll deal with overflow (which should be very
* rare). Also note that while we are dealing with overflow,
* the kernel could be producing and using the ring, and we
* could have consumers consuming from the ring.
*
* Overall, we need to clear the overflow flag, then check every
* event. If we find an event, we need to make sure the *next*
* consumer continues our recovery, hence the overflow_recovery
* field. We could do the check for recovery immediately, but
* that adds complexity and there's no stated guarantee of CEQ
* message ordering (you don't have it with the ring, either,
* technically (consider a coalesce)). So we're fine by having
* *a* consumer finish the recovery, but not necesarily the
* *next* consumer. So long as no one thinks the CEQ is empty
* when there actually are old messages, then we're okay. */
if (!ceq->ring_overflowed && !ceq->overflow_recovery)
return FALSE;
spin_pdr_lock((struct spin_pdr_lock*)&ceq->u_lock);
if (!ceq->overflow_recovery) {
ceq->overflow_recovery = TRUE;
/* set recovery before clearing overflow */
wmb();
ceq->ring_overflowed = FALSE;
ceq->last_recovered = 0;
/* clear overflowed before reading event entries */
wrmb();
}
for (int i = ceq->last_recovered;
i <= atomic_read(&ceq->max_event_ever);
i++) {
/* Regardles of whether there's a msg here, we checked
* it. */
ceq->last_recovered++;
if (extract_ceq_msg(ceq, i, msg)) {
/* We found something. There might be more, but
* a future consumer will have to deal with it,
* or verify there isn't. */
spin_pdr_unlock(
(struct spin_pdr_lock*)&ceq->u_lock);
return TRUE;
}
}
ceq->overflow_recovery = FALSE;
/* made it to the end, looks like there was no overflow left.
* there could be new ones added behind us (they'd be in the
* ring or overflow would be turned on again), but those message
* were added after we started consuming, and therefore not our
* obligation to extract. */
spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
return FALSE;
}
if (!extract_ceq_msg(ceq, idx, msg))
return FALSE;
return TRUE;
}
/* pvt_idx is the next slot that a new consumer will try to consume. when
* pvt_idx != pub_idx, pub_idx is lagging, and it represents consumptions in
* progress. */
static bool __ceq_ring_is_empty(struct ceq *ceq)
{
return __ring_empty(atomic_read(&ceq->prod_idx),
atomic_read(&ceq->cons_pvt_idx));
}
bool ceq_is_empty(struct ceq *ceq)
{
if (!__ceq_ring_is_empty(ceq) ||
ceq->ring_overflowed ||
spin_pdr_locked((struct spin_pdr_lock*)&ceq->u_lock)) {
return FALSE;
}
return TRUE;
}
void ceq_cleanup(struct ceq *ceq)
{
munmap(ceq->events, sizeof(struct ceq_event) * ceq->nr_events);
free(ceq->ring);
}