/* Copyright (c) 2013 The Regents of the University of California
 * Barret Rhoden <brho@cs.berkeley.edu>
 * See LICENSE for details.
 *
 * Atomic pipes.  Multi-reader, multi-writer pipes, similar to sys_pipe except
 * that they operate on fixed sized chunks of data.
 *
 * A note on broadcast wakeups.  We broadcast in a few places.  If we don't,
 * then all paths (like error paths) will have to signal.  Not a big deal
 * either way, but just need to catch all the cases.  Other non-obvious
 * cases are that read and write methods need to wake other readers and
 * writers (in the absence of a broadcast wakeup) */

#include <apipe.h>
#include <ros/ring_buffer.h>
#include <string.h>
#include <stdio.h>

void apipe_init(struct atomic_pipe *ap, void *buf, size_t buf_sz,
                size_t elem_sz)
{
	ap->ap_buf = buf;
	/* power of two number of elements in the ring. */
	ap->ap_ring_sz = ROUNDDOWNPWR2(buf_sz / elem_sz);
	ap->ap_elem_sz = elem_sz;
	ap->ap_rd_off = 0;
	ap->ap_wr_off = 0;
	ap->ap_nr_readers = 1;
	ap->ap_nr_writers = 1;
	/* Three CVs, all using the same lock. */
	spinlock_init(&ap->ap_lock);
	cv_init_with_lock(&ap->ap_priority_reader, &ap->ap_lock);
	cv_init_with_lock(&ap->ap_general_readers, &ap->ap_lock);
	cv_init_with_lock(&ap->ap_writers, &ap->ap_lock);
	ap->ap_has_priority_reader = FALSE;
}

void apipe_open_reader(struct atomic_pipe *ap)
{
	spin_lock(&ap->ap_lock);
	ap->ap_nr_readers++;
	spin_unlock(&ap->ap_lock);
}

void apipe_open_writer(struct atomic_pipe *ap)
{
	spin_lock(&ap->ap_lock);
	ap->ap_nr_writers++;
	spin_unlock(&ap->ap_lock);
}

/* Helper: Wake the appropriate readers.  When there's a priority reader, only
 * that one wakes up.  It's up to the priority reader to wake the other readers,
 * by clearing has_prior and calling this again. */
static void __apipe_wake_readers(struct atomic_pipe *ap)
{
	if (ap->ap_has_priority_reader)
		__cv_signal(&ap->ap_priority_reader);
	else
		__cv_broadcast(&ap->ap_general_readers);
}

/* When closing, there might be others blocked waiting for us.  For example,
 * a writer could have blocked on a full pipe, waiting for us to read.  Instead
 * of reading, the last reader closes.  The writer needs to be woken up so it
 * can return 0. */
void apipe_close_reader(struct atomic_pipe *ap)
{
	spin_lock(&ap->ap_lock);
	ap->ap_nr_readers--;
	__cv_broadcast(&ap->ap_writers);
	spin_unlock(&ap->ap_lock);
}

void apipe_close_writer(struct atomic_pipe *ap)
{
	spin_lock(&ap->ap_lock);
	ap->ap_nr_writers--;
	__apipe_wake_readers(ap);
	spin_unlock(&ap->ap_lock);
}

/* read a pipe that is already locked. */
int apipe_read_locked(struct atomic_pipe *ap, void *buf, size_t nr_elem)
{
	size_t rd_idx;
	int nr_copied = 0;

	for (int i = 0; i < nr_elem; i++) {
		/* readers that call read_locked directly might have failed to check for
		 * emptiness, so we'll double check early. */
		if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
			break;
		/* power of 2 elements in the ring buffer, index is the lower n bits */
		rd_idx = ap->ap_rd_off & (ap->ap_ring_sz - 1);
		memcpy(buf, ap->ap_buf + rd_idx * ap->ap_elem_sz, ap->ap_elem_sz);
		ap->ap_rd_off++;
		buf += ap->ap_elem_sz;
		nr_copied++;
	}
	/* We could have multiple writers blocked.  Just broadcast for them all.
	 * Alternatively, we could signal one, and then it's on the writers to
	 * signal further writers (see the note at the top of this file). */
	__cv_broadcast(&ap->ap_writers);
	return nr_copied;
}


int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem)
{
	size_t rd_idx;
	int nr_copied = 0;

	spin_lock(&ap->ap_lock);
	/* Need to wait til the priority reader is gone, and the ring isn't empty.
	 * If we do this as two steps, (either of priority check or empty check
	 * first), there's a chance the second one will fail, and when we sleep and
	 * wake up, the first condition could have changed.  (An alternative would
	 * be to block priority readers too, by promoting ourselves to a priority
	 * reader). */
	while (ap->ap_has_priority_reader ||
	       __ring_empty(ap->ap_wr_off, ap->ap_rd_off)) {
		if (!ap->ap_nr_writers) {
			spin_unlock(&ap->ap_lock);
			return 0;
		}
		cv_wait(&ap->ap_general_readers);
		cpu_relax();
	}
	/* This read call wakes up writers */
	nr_copied = apipe_read_locked(ap, buf, nr_elem);
	/* If the writer didn't broadcast, we'd need to wake other readers (imagine
	 * a long queue of blocked readers, and a queue filled by one massive
	 * write).  (same with the error case). */
	spin_unlock(&ap->ap_lock);
	return nr_copied;
}

int apipe_write(struct atomic_pipe *ap, void *buf, size_t nr_elem)
{
	size_t wr_idx;
	int nr_copied = 0;

	spin_lock(&ap->ap_lock);
	/* not sure if we want to check for readers first or not */
	while (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off)) {
		if (!ap->ap_nr_readers) {
			spin_unlock(&ap->ap_lock);
			return 0;
		}
		cv_wait(&ap->ap_writers);
		cpu_relax();
	}
	for (int i = 0; i < nr_elem; i++) {
		/* power of 2 elements in the ring buffer, index is the lower n bits */
		wr_idx = ap->ap_wr_off & (ap->ap_ring_sz - 1);
		memcpy(ap->ap_buf + wr_idx * ap->ap_elem_sz, buf, ap->ap_elem_sz);
		ap->ap_wr_off++;
		buf += ap->ap_elem_sz;
		nr_copied++;
		if (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off))
			break;
	}
	/* We only need to wake readers, since the reader that woke us used a
	 * broadcast.  o/w, we'd need to wake the next writer.  (same goes for the
	 * error case). */
	__apipe_wake_readers(ap);
	spin_unlock(&ap->ap_lock);
	return nr_copied;
}

void *apipe_head(struct atomic_pipe *ap)
{
	if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
		return 0;
	return ap->ap_buf + (ap->ap_rd_off & (ap->ap_ring_sz - 1)) * ap->ap_elem_sz;
}

/* 
 * Read data from the pipe until a condition is satisfied.
 * f is the function that determines the condition. f saves its
 * state in arg. When f returns non-zero, this function exits,
 * and returns the value to its caller. Note that f can return -1
 * to indicate an error. But returning zero will keep you trapped in
 * this function. The intent here is to ensure one-reader-at-a-time
 * operation.
 */
int apipe_read_cond(struct atomic_pipe *ap,
		    int(*f)(struct atomic_pipe *pipe, void *arg), void *arg)
{
	size_t rd_idx;
	int ret;

	spin_lock(&ap->ap_lock);
	/* Can only have one priority reader at a time.  Wait our turn. */
	while (ap->ap_has_priority_reader) {
		cv_wait(&ap->ap_general_readers);
		cpu_relax();
	}
	ap->ap_has_priority_reader = TRUE;
	while (1) {
		/* Each time there is a need to check the pipe, call
		 * f. f will maintain its state in arg. It is expected that f
		 * will dequeue elements from the pipe as they are available.
		 * N.B. this is being done for protocols like IPV4 that can
		 * fragment an RPC request. For IPV6, it is likely that this
		 * will end up looking like a blocking read. Thus was it ever
		 * with legacy code. F is supposed to call apipe_read_locked().
		 */
		ret = f(ap, arg);
		if (ret)
			break;
		/* if nr_writers goes to zero, that's bad.  return -1 because they're
		 * going to have to clean up.  We should have been able to call f once
		 * though, to pull out any remaining elements.  The main concern here is
		 * sleeping on the cv when no one (no writers) will wake us. */
		if (!ap->ap_nr_writers) {
			ret = -1;
			goto out;
		}
		cv_wait(&ap->ap_priority_reader);
		cpu_relax();
	}
out:
	/* All out paths need to wake other readers.  When we were woken up, there
	 * was no broadcast sent to the other readers.  Plus, there may be other
	 * potential priority readers. */
	ap->ap_has_priority_reader = FALSE;
	__apipe_wake_readers(ap);
	/* FYI, writers were woken up after an actual read.  If we had an error (ret
	 * == -1), there should be no writers. */
	spin_unlock(&ap->ap_lock);
	return ret;
}
