/* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
 * Portions Copyright © 1997-1999 Vita Nuova Limited
 * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
 *                                (www.vitanuova.com)
 * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
 *
 * Modified for the Akaros operating system:
 * Copyright (c) 2013-2014 The Regents of the University of California
 * Copyright (c) 2013-2015 Google Inc.
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE. */

#include <slab.h>
#include <kmalloc.h>
#include <kref.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <error.h>
#include <cpio.h>
#include <pmap.h>
#include <smp.h>
#include <net/ip.h>

#define PANIC_EXTRA(b)							\
{									\
	if ((b)->extra_len) {						\
		printblock(b);						\
		backtrace();						\
		panic("%s doesn't handle extra_data", __FUNCTION__);	\
	}								\
}

static uint32_t padblockcnt;
static uint32_t concatblockcnt;
static uint32_t pullupblockcnt;
static uint32_t copyblockcnt;
static uint32_t consumecnt;
static uint32_t producecnt;
static uint32_t qcopycnt;

static int debugging;

#define QDEBUG	if(0)

/*
 *  IO queues
 */

struct queue {
	spinlock_t lock;;

	struct block *bfirst;		/* buffer */
	struct block *blast;

	int dlen;			/* data bytes in queue */
	int limit;			/* max bytes in queue */
	int inilim;			/* initial limit */
	int state;
	int eof;			/* number of eofs read by user */
	size_t bytes_read;

	void (*kick) (void *);		/* restart output */
	void (*bypass) (void *, struct block *); /* bypass queue altogether */
	void *arg;			/* argument to kick */

	struct rendez rr;		/* process waiting to read */
	struct rendez wr;		/* process waiting to write */
	qio_wake_cb_t wake_cb;		/* callbacks for qio wakeups */
	void *wake_data;

	char err[ERRMAX];
};

enum {
	Maxatomic = 64 * 1024,
	QIO_CAN_ERR_SLEEP = (1 << 0),	/* can throw errors or block/sleep */
	QIO_LIMIT = (1 << 1),		/* respect q->limit */
	QIO_DROP_OVERFLOW = (1 << 2),	/* alternative to qdropoverflow */
	QIO_JUST_ONE_BLOCK = (1 << 3),	/* when qbreading, just get one block */
	QIO_NON_BLOCK = (1 << 4),	/* throw EAGAIN instead of blocking */
	QIO_DONT_KICK = (1 << 5),	/* don't kick when waking */
};

unsigned int qiomaxatomic = Maxatomic;

static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
                              int mem_flags);
static bool qwait_and_ilock(struct queue *q, int qio_flags);

/* Helper: fires a wake callback, sending 'filter' */
static void qwake_cb(struct queue *q, int filter)
{
	if (q->wake_cb)
		q->wake_cb(q, q->wake_data, filter);
}

void ixsummary(void)
{
	debugging ^= 1;
	printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
		   padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
	printd("consume %lu, produce %lu, qcopy %lu\n",
		   consumecnt, producecnt, qcopycnt);
}

/*
 *  pad a block to the front (or the back if size is negative)
 */
struct block *padblock(struct block *bp, int size)
{
	int n;
	struct block *nbp;

	QDEBUG checkb(bp, "padblock 1");
	if (size >= 0) {
		if (bp->rp - bp->base >= size) {
			bp->network_offset += size;
			bp->transport_offset += size;
			bp->rp -= size;
			return bp;
		}

		PANIC_EXTRA(bp);
		if (bp->next)
			panic("%s %p had a next", __func__, bp);
		n = BLEN(bp);
		padblockcnt++;
		nbp = block_alloc(size + n, MEM_WAIT);
		block_copy_metadata(nbp, bp);
		nbp->rp += size;
		nbp->wp = nbp->rp;
		memmove(nbp->wp, bp->rp, n);
		nbp->wp += n;
		freeb(bp);
		nbp->rp -= size;
	} else {
		size = -size;

		PANIC_EXTRA(bp);

		if (bp->next)
			panic("%s %p had a next", __func__, bp);

		if (bp->lim - bp->wp >= size)
			return bp;

		n = BLEN(bp);
		padblockcnt++;
		nbp = block_alloc(size + n, MEM_WAIT);
		block_copy_metadata(nbp, bp);
		memmove(nbp->wp, bp->rp, n);
		nbp->wp += n;
		freeb(bp);
	}
	QDEBUG checkb(nbp, "padblock 1");
	return nbp;
}

/*
 *  return count of bytes in a string of blocks
 */
int blocklen(struct block *bp)
{
	int len;

	len = 0;
	while (bp) {
		len += BLEN(bp);
		bp = bp->next;
	}
	return len;
}

/*
 * return count of space in blocks
 */
int blockalloclen(struct block *bp)
{
	int len;

	len = 0;
	while (bp) {
		len += BALLOC(bp);
		bp = bp->next;
	}
	return len;
}

/*
 *  copy the  string of blocks into
 *  a single block and free the string
 */
struct block *concatblock(struct block *bp)
{
	int len;
	struct block *nb, *f;

	if (bp->next == 0)
		return bp;

	/* probably use parts of qclone */
	PANIC_EXTRA(bp);
	nb = block_alloc(blocklen(bp), MEM_WAIT);
	for (f = bp; f; f = f->next) {
		len = BLEN(f);
		memmove(nb->wp, f->rp, len);
		nb->wp += len;
	}
	concatblockcnt += BLEN(nb);
	freeblist(bp);
	QDEBUG checkb(nb, "concatblock 1");
	return nb;
}

/* Makes an identical copy of the block, collapsing all the data into the block
 * body.  It does not point to the contents of the original, it is a copy
 * (unlike qclone).  Since we're copying, we might as well put the memory into
 * one contiguous chunk. */
struct block *copyblock(struct block *bp, int mem_flags)
{
	struct block *newb;
	struct extra_bdata *ebd;
	size_t amt;

	QDEBUG checkb(bp, "copyblock 0");
	newb = block_alloc(BLEN(bp), mem_flags);
	if (!newb)
		return 0;
	amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
	assert(amt == BHLEN(bp));
	for (int i = 0; i < bp->nr_extra_bufs; i++) {
		ebd = &bp->extra_data[i];
		if (!ebd->base || !ebd->len)
			continue;
		amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off,
					 ebd->len);
		assert(amt == ebd->len);
	}
	block_copy_metadata(newb, bp);
	copyblockcnt++;
	QDEBUG checkb(newb, "copyblock 1");
	return newb;
}

/* Returns a block with the remaining contents of b all in the main body of the
 * returned block.  Replace old references to b with the returned value (which
 * may still be 'b', if no change was needed. */
struct block *linearizeblock(struct block *b)
{
	struct block *newb;

	if (!b->extra_len)
		return b;
	newb = copyblock(b, MEM_WAIT);
	freeb(b);
	return newb;
}

/* Make sure the first block has at least n bytes in its main body.  Pulls up
 * data from the *list* of blocks.  Returns 0 if there is not enough data in the
 * block list. */
struct block *pullupblock(struct block *bp, int n)
{
	int i, len, seglen;
	struct block *nbp;
	struct extra_bdata *ebd;

	/*
	 *  this should almost always be true, it's
	 *  just to avoid every caller checking.
	 */
	if (BHLEN(bp) >= n)
		return bp;

	/* If there's no chance, just bail out now.  This might be slightly
	 * wasteful if there's a long blist that does have enough data. */
	if (n > blocklen(bp))
		return 0;
	/* a start at explicit main-body / header management */
	if (bp->extra_len) {
		if (n > bp->lim - bp->rp) {
			/* would need to realloc a new block and copy everything
			 * over. */
			panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
			      n, bp->lim, bp->rp, bp->lim-bp->rp);
		}
		len = n - BHLEN(bp);
		/* Would need to recursively call this, or otherwise pull from
		 * later blocks and put chunks of their data into the block
		 * we're building. */
		if (len > bp->extra_len)
			panic("pullup more than extra (%d, %d, %d)\n",
			      n, BHLEN(bp), bp->extra_len);
		QDEBUG checkb(bp, "before pullup");
		for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
			ebd = &bp->extra_data[i];
			if (!ebd->base || !ebd->len)
				continue;
			seglen = MIN(ebd->len, len);
			memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
			bp->wp += seglen;
			len -= seglen;
			ebd->len -= seglen;
			ebd->off += seglen;
			bp->extra_len -= seglen;
			if (ebd->len == 0) {
				kfree((void *)ebd->base);
				ebd->off = 0;
				ebd->base = 0;
			}
		}
		/* maybe just call pullupblock recursively here */
		if (len)
			panic("pullup %d bytes overdrawn\n", len);
		QDEBUG checkb(bp, "after pullup");
		return bp;
	}

	/*
	 *  if not enough room in the first block,
	 *  add another to the front of the list.
	 */
	if (bp->lim - bp->rp < n) {
		nbp = block_alloc(n, MEM_WAIT);
		nbp->next = bp;
		bp = nbp;
	}

	/*
	 *  copy bytes from the trailing blocks into the first
	 */
	n -= BLEN(bp);
	while ((nbp = bp->next)) {
		i = BLEN(nbp);
		if (i > n) {
			memmove(bp->wp, nbp->rp, n);
			pullupblockcnt++;
			bp->wp += n;
			nbp->rp += n;
			QDEBUG checkb(bp, "pullupblock 1");
			return bp;
		} else {
			memmove(bp->wp, nbp->rp, i);
			pullupblockcnt++;
			bp->wp += i;
			bp->next = nbp->next;
			nbp->next = 0;
			freeb(nbp);
			n -= i;
			if (n == 0) {
				QDEBUG checkb(bp, "pullupblock 2");
				return bp;
			}
		}
	}
	freeb(bp);
	return 0;
}

/*
 *  make sure the first block has at least n bytes in its main body
 */
struct block *pullupqueue(struct queue *q, int n)
{
	struct block *b;

	/* TODO: lock to protect the queue links? */
	if ((BHLEN(q->bfirst) >= n))
		return q->bfirst;
	q->bfirst = pullupblock(q->bfirst, n);
	for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
	q->blast = b;
	return q->bfirst;
}

/* throw away count bytes from the front of
 * block's extradata.  Returns count of bytes
 * thrown away
 */

static int pullext(struct block *bp, int count)
{
	struct extra_bdata *ed;
	int i, rem, bytes = 0;

	for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
		ed = &bp->extra_data[i];
		rem = MIN(count, ed->len);
		bp->extra_len -= rem;
		count -= rem;
		bytes += rem;
		ed->off += rem;
		ed->len -= rem;
		if (ed->len == 0) {
			kfree((void *)ed->base);
			ed->base = 0;
			ed->off = 0;
		}
	}
	return bytes;
}

/* throw away count bytes from the end of a
 * block's extradata.  Returns count of bytes
 * thrown away
 */

static int dropext(struct block *bp, int count)
{
	struct extra_bdata *ed;
	int i, rem, bytes = 0;

	for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
		ed = &bp->extra_data[i];
		rem = MIN(count, ed->len);
		bp->extra_len -= rem;
		count -= rem;
		bytes += rem;
		ed->len -= rem;
		if (ed->len == 0) {
			kfree((void *)ed->base);
			ed->base = 0;
			ed->off = 0;
		}
	}
	return bytes;
}

/*
 *  throw away up to count bytes from a
 *  list of blocks.  Return count of bytes
 *  thrown away.
 */
static int _pullblock(struct block **bph, int count, int free)
{
	struct block *bp;
	int n, bytes;

	bytes = 0;
	if (bph == NULL)
		return 0;

	while (*bph != NULL && count != 0) {
		bp = *bph;

		n = MIN(BHLEN(bp), count);
		bytes += n;
		count -= n;
		bp->rp += n;
		n = pullext(bp, count);
		bytes += n;
		count -= n;
		QDEBUG checkb(bp, "pullblock ");
		if (BLEN(bp) == 0 && (free || count)) {
			*bph = bp->next;
			bp->next = NULL;
			freeb(bp);
		}
	}
	return bytes;
}

int pullblock(struct block **bph, int count)
{
	return _pullblock(bph, count, 1);
}

/*
 *  trim to len bytes starting at offset
 */
struct block *trimblock(struct block *bp, int offset, int len)
{
	uint32_t l, trim;
	int olen = len;

	QDEBUG checkb(bp, "trimblock 1");
	if (blocklen(bp) < offset + len) {
		freeblist(bp);
		return NULL;
	}

	l =_pullblock(&bp, offset, 0);
	if (bp == NULL)
		return NULL;
	if (l != offset) {
		freeblist(bp);
		return NULL;
	}

	while ((l = BLEN(bp)) < len) {
		len -= l;
		bp = bp->next;
	}

	trim = BLEN(bp) - len;
	trim -= dropext(bp, trim);
	bp->wp -= trim;

	if (bp->next) {
		freeblist(bp->next);
		bp->next = NULL;
	}
	return bp;
}

/* Adjust block @bp so that its size is exactly @len.
 * If the size is increased, fill in the new contents with zeros.
 * If the size is decreased, discard some of the old contents at the tail. */
struct block *adjustblock(struct block *bp, int len)
{
	struct extra_bdata *ebd;
	void *buf;
	int i;

	if (len < 0) {
		freeb(bp);
		return NULL;
	}

	if (len == BLEN(bp))
		return bp;

	/* Shrink within block main body. */
	if (len <= BHLEN(bp)) {
		free_block_extra(bp);
		bp->wp = bp->rp + len;
		QDEBUG checkb(bp, "adjustblock 1");
		return bp;
	}

	/* Need to grow. */
	if (len > BLEN(bp)) {
		/* Grow within block main body. */
		if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
			memset(bp->wp, 0, len - BLEN(bp));
			bp->wp = bp->rp + len;
			QDEBUG checkb(bp, "adjustblock 2");
			return bp;
		}
		/* Grow with extra data buffers. */
		buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
		block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp),
				   MEM_WAIT);
		QDEBUG checkb(bp, "adjustblock 3");
		return bp;
	}

	/* Shrink extra data buffers.
	 * len is how much of ebd we need to keep.
	 * extra_len is re-accumulated. */
	assert(bp->extra_len > 0);
	len -= BHLEN(bp);
	bp->extra_len = 0;
	for (i = 0; i < bp->nr_extra_bufs; i++) {
		ebd = &bp->extra_data[i];
		if (len <= ebd->len)
			break;
		len -= ebd->len;
		bp->extra_len += ebd->len;
	}
	/* If len becomes zero, extra_data[i] should be freed. */
	if (len > 0) {
		ebd = &bp->extra_data[i];
		ebd->len = len;
		bp->extra_len += ebd->len;
		i++;
	}
	for (; i < bp->nr_extra_bufs; i++) {
		ebd = &bp->extra_data[i];
		if (ebd->base)
			kfree((void*)ebd->base);
		ebd->base = ebd->off = ebd->len = 0;
	}
	QDEBUG checkb(bp, "adjustblock 4");
	return bp;
}

/* Helper: removes and returns the first block from q */
static struct block *pop_first_block(struct queue *q)
{
	struct block *b = q->bfirst;

	q->dlen -= BLEN(b);
	q->bytes_read += BLEN(b);
	q->bfirst = b->next;
	b->next = 0;
	return b;
}

/* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
{
	copy_amt = MIN(to->lim - to->wp, copy_amt);
	memcpy(to->wp, from, copy_amt);
	to->wp += copy_amt;
	return copy_amt;
}

/* Accounting helper.  Block b in q lost amt extra_data */
static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
{
	b->extra_len -= amt;
	q->dlen -= amt;
	q->bytes_read += amt;
}

/* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
 * fixed in 'from', so we move its contents and zero it out in 'from'.
 *
 * Returns the length moved (0 on failure). */
static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
                       struct block *from, struct queue *from_q)
{
	size_t ret = ebd->len;

	if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
		return 0;
	block_and_q_lost_extra(from, from_q, ebd->len);
	ebd->base = ebd->len = ebd->off = 0;
	return ret;
}

/* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
 * return with less than len, but greater than 0, even if there is more
 * available in q.
 *
 * At any moment that we have copied anything and things are tricky, we can just
 * return.  The trickiness comes from a bunch of variables: is the main body
 * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
 * to @to's main body, but only if we haven't used it yet. */
static size_t copy_from_first_block(struct queue *q, struct block *to,
                                    size_t len)
{
	struct block *from = q->bfirst;
	size_t copy_amt, amt;
	struct extra_bdata *ebd;

	assert(len < BLEN(from));	/* sanity */
	/* Try to extract from the main body */
	copy_amt = MIN(BHLEN(from), len);
	if (copy_amt) {
		copy_amt = copy_to_block_body(to, from->rp, copy_amt);
		from->rp += copy_amt;
		/* We only change dlen, (data len), not q->len, since the q
		 * still has the same block memory allocation (no kfrees
		 * happened) */
		q->dlen -= copy_amt;
		q->bytes_read += copy_amt;
	}
	/* Try to extract the remainder from the extra data */
	len -= copy_amt;
	for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
		ebd = &from->extra_data[i];
		if (!ebd->base || !ebd->len)
			continue;
		if (len >= ebd->len) {
			amt = move_ebd(ebd, to, from, q);
			if (!amt) {
				/* our internal alloc could have failed.   this
				 * ebd is now the last one we'll consider.
				 * let's handle it separately and put it in the
				 * main body. */
				if (copy_amt)
					return copy_amt;
				copy_amt = copy_to_block_body(to,
							      (void*)ebd->base +
							      ebd->off,
				                              ebd->len);
				block_and_q_lost_extra(from, q, copy_amt);
				break;
			}
			len -= amt;
			copy_amt += amt;
			continue;
		} else {
			/* If we're here, we reached our final ebd, which we'll
			 * need to split to get anything from it. */
			if (copy_amt)
				return copy_amt;
			copy_amt = copy_to_block_body(to, (void*)ebd->base +
						      ebd->off, len);
			ebd->off += copy_amt;
			ebd->len -= copy_amt;
			block_and_q_lost_extra(from, q, copy_amt);
			break;
		}
	}
	if (len)
		assert(copy_amt);	/* sanity */
	return copy_amt;
}

/* Return codes for __qbread and __try_qbread. */
enum {
	QBR_OK,
	QBR_FAIL,
	QBR_SPARE,	/* we need a spare block */
	QBR_AGAIN,	/* do it again, we are coalescing blocks */
};

/* Helper and back-end for __qbread: extracts and returns a list of blocks
 * containing up to len bytes.  It may contain less than len even if q has more
 * data.
 *
 * Returns a code interpreted by __qbread, and the returned blist in ret. */
static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                        struct block **real_ret, struct block *spare)
{
	struct block *ret, *ret_last, *first;
	size_t blen;
	bool was_unwritable = FALSE;

	if (qio_flags & QIO_CAN_ERR_SLEEP) {
		if (!qwait_and_ilock(q, qio_flags)) {
			spin_unlock_irqsave(&q->lock);
			return QBR_FAIL;
		}
		/* we qwaited and still hold the lock, so the q is not empty */
		first = q->bfirst;
	} else {
		spin_lock_irqsave(&q->lock);
		first = q->bfirst;
		if (!first) {
			spin_unlock_irqsave(&q->lock);
			return QBR_FAIL;
		}
	}
	/* We need to check before adjusting q->len.  We're checking the
	 * writer's sleep condition / tap condition.  When set, we *might* be
	 * making an edge transition (from unwritable to writable), which needs
	 * to wake and fire taps.  But, our read might not drain the queue below
	 * q->lim.  We'll check again later to see if we should really wake
	 * them.  */
	was_unwritable = !qwritable(q);
	blen = BLEN(first);
	if ((q->state & Qcoalesce) && (blen == 0)) {
		freeb(pop_first_block(q));
		spin_unlock_irqsave(&q->lock);
		/* Need to retry to make sure we have a first block */
		return QBR_AGAIN;
	}
	/* Qmsg: just return the first block.  Be careful, since our caller
	 * might not read all of the block and thus drop bytes.  Similar to
	 * SOCK_DGRAM. */
	if (q->state & Qmsg) {
		ret = pop_first_block(q);
		goto out_ok;
	}
	/* Let's get at least something first - makes the code easier.  This
	 * way, we'll only ever split the block once. */
	if (blen <= len) {
		ret = pop_first_block(q);
		len -= blen;
	} else {
		/* need to split the block.  we won't actually take the first
		 * block out of the queue - we're just extracting a little bit.
		 */
		if (!spare) {
			/* We have nothing and need a spare block.  Retry! */
			spin_unlock_irqsave(&q->lock);
			return QBR_SPARE;
		}
		copy_from_first_block(q, spare, len);
		ret = spare;
		goto out_ok;
	}
	/* At this point, we just grabbed the first block.  We can try to grab
	 * some more, up to len (if they want). */
	if (qio_flags & QIO_JUST_ONE_BLOCK)
		goto out_ok;
	ret_last = ret;
	while (q->bfirst && (len > 0)) {
		blen = BLEN(q->bfirst);
		if ((q->state & Qcoalesce) && (blen == 0)) {
			/* remove the intermediate 0 blocks */
			freeb(pop_first_block(q));
			continue;
		}
		if (blen > len) {
			/* We could try to split the block, but that's a huge
			 * pain.  For instance, we might need to move the main
			 * body of b into an extra_data of ret_last.  lots of
			 * ways for that to fail, and lots of cases to consider.
			 * Easier to just bail out.  This is why I did the first
			 * block above: we don't need to worry about this. */
			 break;
		}
		ret_last->next = pop_first_block(q);
		ret_last = ret_last->next;
		len -= blen;
	}
out_ok:
	/* Don't wake them up or fire tap if we didn't drain enough. */
	if (!qwritable(q))
		was_unwritable = FALSE;
	spin_unlock_irqsave(&q->lock);
	if (was_unwritable) {
		if (q->kick && !(qio_flags & QIO_DONT_KICK))
			q->kick(q->arg);
		rendez_wakeup(&q->wr);
		qwake_cb(q, FDTAP_FILT_WRITABLE);
	}
	*real_ret = ret;
	return QBR_OK;
}

/* Helper and front-end for __try_qbread: extracts and returns a list of blocks
 * containing up to len bytes.  It may contain less than len even if q has more
 * data.
 *
 * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
 * if it required a spare and the memory allocation failed.
 *
 * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
 * could get a zero length block back. */
static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
                              int mem_flags)
{
	ERRSTACK(1);
	struct block *ret = 0;
	struct block *volatile spare = 0;	/* volatile for the waserror */

	/* __try_qbread can throw, based on qio flags. */
	if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
		if (spare)
			freeb(spare);
		nexterror();
	}
	while (1) {
		switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
		case QBR_OK:
		case QBR_FAIL:
			if (spare && (ret != spare))
				freeb(spare);
			goto out_ret;
		case QBR_SPARE:
			assert(!spare);
			/* Due to some nastiness, we need a fresh block so we
			 * can read out anything from the queue.  'len' seems
			 * like a reasonable amount.  Maybe we can get away with
			 * less. */
			spare = block_alloc(len, mem_flags);
			if (!spare) {
				/* Careful here: a memory failure (possible with
				 * MEM_ATOMIC) could look like 'no data in the
				 * queue' (QBR_FAIL).  The only one who does is
				 * this qget(), who happens to know that we
				 * won't need a spare, due to the len argument.
				 * Spares are only needed when we need to split
				 * a block. */
				ret = 0;
				goto out_ret;
			}
			break;
		case QBR_AGAIN:
			/* if the first block is 0 and we are Qcoalesce, then
			 * we'll need to try again.  We bounce out of __try so
			 * we can perform the "is there a block" logic again
			 * from the top. */
			break;
		}
	}
	assert(0);
out_ret:
	if (qio_flags & QIO_CAN_ERR_SLEEP)
		poperror();
	return ret;
}

/*
 *  get next block from a queue, return null if nothing there
 */
struct block *qget(struct queue *q)
{
	/* since len == SIZE_MAX, we should never need to do a mem alloc */
	return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
}

/* Throw away the next 'len' bytes in the queue returning the number actually
 * discarded.
 *
 * If the bytes are in the queue, then they must be discarded.  The only time to
 * return less than len is if the q itself has less than len bytes.
 *
 * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
 * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
size_t qdiscard(struct queue *q, size_t len)
{
	struct block *blist;
	size_t removed_amt;
	size_t sofar = 0;

	/* This is racy.  There could be multiple qdiscarders or other
	 * consumers, where the consumption could be interleaved. */
	while (qlen(q) && len) {
		blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
		removed_amt = freeblist(blist);
		sofar += removed_amt;
		len -= removed_amt;
	}
	return sofar;
}

ssize_t qpass(struct queue *q, struct block *b)
{
	return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
}

ssize_t qpassnolim(struct queue *q, struct block *b)
{
	return __qbwrite(q, b, 0);
}

/*
 *  if the allocated space is way out of line with the used
 *  space, reallocate to a smaller block
 */
struct block *packblock(struct block *bp)
{
	struct block **l, *nbp;
	int n;

	if (bp->extra_len)
		return bp;
	for (l = &bp; *l; l = &(*l)->next) {
		nbp = *l;
		n = BLEN(nbp);
		if ((n << 2) < BALLOC(nbp)) {
			*l = block_alloc(n, MEM_WAIT);
			memmove((*l)->wp, nbp->rp, n);
			(*l)->wp += n;
			(*l)->next = nbp->next;
			freeb(nbp);
		}
	}

	return bp;
}

/* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
 * body_rp, for up to len.  Returns the len consumed.
 *
 * The base is 'b', so that we can kfree it later.  This currently ties us to
 * using kfree for the release method for all extra_data.
 *
 * It is possible to have a body size that is 0, if there is no offset, and
 * b->wp == b->rp.  This will have an extra data entry of 0 length. */
static size_t point_to_body(struct block *b, uint8_t *body_rp,
                            struct block *newb, unsigned int newb_idx,
                            size_t len)
{
	struct extra_bdata *ebd = &newb->extra_data[newb_idx];

	assert(newb_idx < newb->nr_extra_bufs);

	kmalloc_incref(b);
	ebd->base = (uintptr_t)b;
	ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
	ebd->len = MIN(b->wp - body_rp, len);	/* think of body_rp as b->rp */
	assert((int)ebd->len >= 0);
	newb->extra_len += ebd->len;
	return ebd->len;
}

/* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
 * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
 * consumed.
 *
 * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
                           struct block *newb, unsigned int newb_idx,
                           size_t len)
{
	struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
	struct extra_bdata *b_ebd = &b->extra_data[b_idx];

	assert(b_idx < b->nr_extra_bufs);
	assert(newb_idx < newb->nr_extra_bufs);

	kmalloc_incref((void*)b_ebd->base);
	n_ebd->base = b_ebd->base;
	n_ebd->off = b_ebd->off + b_off;
	n_ebd->len = MIN(b_ebd->len - b_off, len);
	newb->extra_len += n_ebd->len;
	return n_ebd->len;
}

/* given a string of blocks, sets up the new block's extra_data such that it
 * *points* to the contents of the blist [offset, len + offset).  This does not
 * make a separate copy of the contents of the blist.
 *
 * returns 0 on success.  the only failure is if the extra_data array was too
 * small, so this returns a positive integer saying how big the extra_data needs
 * to be.
 *
 * callers are responsible for protecting the list structure. */
static int __blist_clone_to(struct block *blist, struct block *newb, int len,
                            uint32_t offset)
{
	struct block *b, *first;
	unsigned int nr_bufs = 0;
	unsigned int b_idx, newb_idx = 0;
	uint8_t *first_main_body = 0;
	ssize_t sofar = 0;

	/* find the first block; keep offset relative to the latest b in the
	 * list */
	for (b = blist; b; b = b->next) {
		if (BLEN(b) > offset)
			break;
		offset -= BLEN(b);
	}
	/* qcopy semantics: if you asked for an offset outside the block list,
	 * you get an empty block back */
	if (!b)
		return 0;
	first = b;
	sofar -= offset; /* don't count the remaining offset in the first b */
	/* upper bound for how many buffers we'll need in newb */
	for (/* b is set*/; b; b = b->next) {
		nr_bufs += BHLEN(b) ? 1 : 0;
		/* still assuming nr_extra == nr_valid */
		nr_bufs += b->nr_extra_bufs;
		sofar += BLEN(b);
		if (sofar > len)
			break;
	}
	/* we might be holding a spinlock here, so we won't wait for kmalloc */
	if (block_add_extd(newb, nr_bufs, 0) != 0) {
		/* caller will need to alloc these, then re-call us */
		return nr_bufs;
	}
	for (b = first; b && len; b = b->next) {
		b_idx = 0;
		if (offset) {
			if (offset < BHLEN(b)) {
				/* off is in the main body */
				len -= point_to_body(b, b->rp + offset, newb,
						     newb_idx, len);
				newb_idx++;
			} else {
				/* off is in one of the buffers (or just past
				 * the last one).  we're not going to point to
				 * b's main body at all. */
				offset -= BHLEN(b);
				assert(b->extra_data);
				/* assuming these extrabufs are packed, or at
				 * least that len isn't gibberish */
				while (b->extra_data[b_idx].len <= offset) {
					offset -= b->extra_data[b_idx].len;
					b_idx++;
				}
				/* now offset is set to our offset in the
				 * b_idx'th buf */
				len -= point_to_buf(b, b_idx, offset, newb,
						    newb_idx, len);
				newb_idx++;
				b_idx++;
			}
			offset = 0;
		} else {
			if (BHLEN(b)) {
				len -= point_to_body(b, b->rp, newb, newb_idx,
						     len);
				newb_idx++;
			}
		}
		/* knock out all remaining bufs.  we only did one point_to_ op
		 * by now, and any point_to_ could be our last if it consumed
		 * all of len. */
		for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
			len -= point_to_buf(b, i, 0, newb, newb_idx, len);
			newb_idx++;
		}
	}
	return 0;
}

struct block *blist_clone(struct block *blist, int header_len, int len,
                          uint32_t offset)
{
	int ret;
	struct block *newb = block_alloc(header_len, MEM_WAIT);

	do {
		ret = __blist_clone_to(blist, newb, len, offset);
		if (ret)
			block_add_extd(newb, ret, MEM_WAIT);
	} while (ret);
	return newb;
}

/* given a queue, makes a single block with header_len reserved space in the
 * block main body, and the contents of [offset, len + offset) pointed to in the
 * new blocks ext_data.  This does not make a copy of the q's contents, though
 * you do have a ref count on the memory. */
struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
{
	int ret;
	struct block *newb = block_alloc(header_len, MEM_WAIT);
	/* the while loop should rarely be used: it would require someone
	 * concurrently adding to the queue. */
	do {
		/* TODO: RCU protect the q list (b->next) (need read lock) */
		spin_lock_irqsave(&q->lock);
		ret = __blist_clone_to(q->bfirst, newb, len, offset);
		spin_unlock_irqsave(&q->lock);
		if (ret)
			block_add_extd(newb, ret, MEM_WAIT);
	} while (ret);
	return newb;
}

/*
 *  copy from offset in the queue
 */
struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
{
	int sofar;
	int n;
	struct block *b, *nb;
	uint8_t *p;

	nb = block_alloc(len, MEM_WAIT);

	spin_lock_irqsave(&q->lock);

	/* go to offset */
	b = q->bfirst;
	for (sofar = 0;; sofar += n) {
		if (b == NULL) {
			spin_unlock_irqsave(&q->lock);
			return nb;
		}
		n = BLEN(b);
		if (sofar + n > offset) {
			p = b->rp + offset - sofar;
			n -= offset - sofar;
			break;
		}
		QDEBUG checkb(b, "qcopy");
		b = b->next;
	}

	/* copy bytes from there */
	for (sofar = 0; sofar < len;) {
		if (n > len - sofar)
			n = len - sofar;
		PANIC_EXTRA(b);
		memmove(nb->wp, p, n);
		qcopycnt += n;
		sofar += n;
		nb->wp += n;
		b = b->next;
		if (b == NULL)
			break;
		n = BLEN(b);
		p = b->rp;
	}
	spin_unlock_irqsave(&q->lock);

	return nb;
}

struct block *qcopy(struct queue *q, int len, uint32_t offset)
{
#ifdef CONFIG_BLOCK_EXTRAS
	return qclone(q, 0, len, offset);
#else
	return qcopy_old(q, len, offset);
#endif
}

static void qinit_common(struct queue *q)
{
	spinlock_init_irqsave(&q->lock);
	rendez_init(&q->rr);
	rendez_init(&q->wr);
}

/*
 *  called by non-interrupt code
 */
struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
{
	struct queue *q;

	q = kzmalloc(sizeof(struct queue), 0);
	if (q == 0)
		return 0;
	qinit_common(q);

	q->limit = q->inilim = limit;
	q->kick = kick;
	q->arg = arg;
	q->state = msg;
	q->eof = 0;

	return q;
}

/* open a queue to be bypassed */
struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
{
	struct queue *q;

	q = kzmalloc(sizeof(struct queue), 0);
	if (q == 0)
		return 0;
	qinit_common(q);

	q->limit = 0;
	q->arg = arg;
	q->bypass = bypass;
	q->state = 0;

	return q;
}

static int notempty(void *a)
{
	struct queue *q = a;

	return (q->state & Qclosed) || q->bfirst != 0;
}

/* Block, waiting for the queue to be non-empty or closed.  Returns with
 * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
 * was naturally closed.  Throws an error o/w. */
static bool qwait_and_ilock(struct queue *q, int qio_flags)
{
	while (1) {
		spin_lock_irqsave(&q->lock);
		if (q->bfirst != NULL)
			return TRUE;
		if (q->state & Qclosed) {
			if (++q->eof > 3) {
				spin_unlock_irqsave(&q->lock);
				error(EPIPE,
				      "multiple reads on a closed queue");
			}
			if (q->err[0]) {
				spin_unlock_irqsave(&q->lock);
				error(EPIPE, q->err);
			}
			return FALSE;
		}
		if (qio_flags & QIO_NON_BLOCK) {
			spin_unlock_irqsave(&q->lock);
			error(EAGAIN, "queue empty");
		}
		spin_unlock_irqsave(&q->lock);
		/* As with the producer side, we check for a condition while
		 * holding the q->lock, decide to sleep, then unlock.  It's like
		 * the "check, signal, check again" pattern, but we do it
		 * conditionally.  Both sides agree synchronously to do it, and
		 * those decisions are made while holding q->lock.  I think this
		 * is OK.
		 *
		 * The invariant is that no reader sleeps when the queue has
		 * data.  While holding the rendez lock, if we see there's no
		 * data, we'll sleep.  Since we saw there was no data, the next
		 * writer will see (or already saw) no data, and then the writer
		 * decides to rendez_wake, which will grab the rendez lock.  If
		 * the writer already did that, then we'll see notempty when we
		 * do our check-again. */
		rendez_sleep(&q->rr, notempty, q);
	}
}

/*
 * add a block list to a queue
 * XXX basically the same as enqueue blist, and has no locking!
 */
void qaddlist(struct queue *q, struct block *b)
{
	/* TODO: q lock? */
	/* queue the block */
	if (q->bfirst)
		q->blast->next = b;
	else
		q->bfirst = b;
	q->dlen += blocklen(b);
	while (b->next)
		b = b->next;
	q->blast = b;
}

static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
{
	size_t copy_amt, retval = 0;
	struct extra_bdata *ebd;

	copy_amt = MIN(BHLEN(b), amt);
	memcpy(to, b->rp, copy_amt);
	/* advance the rp, since this block might not be completely consumed and
	 * future reads need to know where to pick up from */
	b->rp += copy_amt;
	to += copy_amt;
	amt -= copy_amt;
	retval += copy_amt;
	for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
		ebd = &b->extra_data[i];
		/* skip empty entires.  if we track this in the struct block, we
		 * can just start the for loop early */
		if (!ebd->base || !ebd->len)
			continue;
		copy_amt = MIN(ebd->len, amt);
		memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
		/* we're actually consuming the entries, just like how we
		 * advance rp up above, and might only consume part of one. */
		ebd->len -= copy_amt;
		ebd->off += copy_amt;
		b->extra_len -= copy_amt;
		if (!ebd->len) {
			/* we don't actually have to decref here.  it's also
			 * done in freeb().  this is the earliest we can free.
			 */
			kfree((void*)ebd->base);
			ebd->base = ebd->off = 0;
		}
		to += copy_amt;
		amt -= copy_amt;
		retval += copy_amt;
	}
	return retval;
}

/*
 *  copy the contents of a string of blocks into
 *  memory.  emptied blocks are freed.  return
 *  pointer to first unconsumed block.
 */
struct block *bl2mem(uint8_t * p, struct block *b, int n)
{
	int i;
	struct block *next;

	/* could be slicker here, since read_from_block is smart */
	for (; b != NULL; b = next) {
		i = BLEN(b);
		if (i > n) {
			/* partial block, consume some */
			read_from_block(b, p, n);
			return b;
		}
		/* full block, consume all and move on */
		i = read_from_block(b, p, i);
		n -= i;
		p += i;
		next = b->next;
		freeb(b);
	}
	return NULL;
}

/* Extract the contents of all blocks and copy to va, up to len.  Returns the
 * actual amount copied. */
static size_t read_all_blocks(struct block *b, void *va, size_t len)
{
	size_t sofar = 0;
	struct block *next;

	do {
		assert(va);
		assert(b->rp);
		sofar += read_from_block(b, va + sofar, len - sofar);
		if (BLEN(b) && b->next)
			panic("Failed to drain entire block (Qmsg?) but had a next!");
		next = b->next;
		freeb(b);
		b = next;
	} while (b);
	return sofar;
}

/*
 *  copy the contents of memory into a string of blocks.
 *  return NULL on error.
 */
struct block *mem2bl(uint8_t * p, int len)
{
	ERRSTACK(1);
	int n;
	struct block *b, *first, **l;

	first = NULL;
	l = &first;
	if (waserror()) {
		freeblist(first);
		nexterror();
	}
	do {
		n = len;
		if (n > Maxatomic)
			n = Maxatomic;

		*l = b = block_alloc(n, MEM_WAIT);
		/* TODO consider extra_data */
		memmove(b->wp, p, n);
		b->wp += n;
		p += n;
		len -= n;
		l = &b->next;
	} while (len > 0);
	poperror();

	return first;
}

/*
 *  put a block back to the front of the queue
 *  called with q ilocked
 */
void qputback(struct queue *q, struct block *b)
{
	b->next = q->bfirst;
	if (q->bfirst == NULL)
		q->blast = b;
	q->bfirst = b;
	q->dlen += BLEN(b);
	/* qputback seems to undo a read, so we can undo the accounting too. */
	q->bytes_read -= BLEN(b);
}

/*
 *  get next block from a queue (up to a limit)
 *
 */
struct block *qbread(struct queue *q, size_t len)
{
	return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP,
			MEM_WAIT);
}

struct block *qbread_nonblock(struct queue *q, size_t len)
{
	return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
	                QIO_NON_BLOCK, MEM_WAIT);
}

/* read up to len from a queue into vp. */
size_t qread(struct queue *q, void *va, size_t len)
{
	struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);

	if (!blist)
		return 0;
	return read_all_blocks(blist, va, len);
}

size_t qread_nonblock(struct queue *q, void *va, size_t len)
{
	struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP |
				       QIO_NON_BLOCK, MEM_WAIT);

	if (!blist)
		return 0;
	return read_all_blocks(blist, va, len);
}

/* This is the rendez wake condition for writers. */
static int qwriter_should_wake(void *a)
{
	struct queue *q = a;

	return qwritable(q) || (q->state & Qclosed);
}

/* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
static size_t enqueue_blist(struct queue *q, struct block *b)
{
	size_t dlen;

	if (q->bfirst)
		q->blast->next = b;
	else
		q->bfirst = b;
	dlen = BLEN(b);
	while (b->next) {
		b = b->next;
		dlen += BLEN(b);
	}
	q->blast = b;
	q->dlen += dlen;
	return dlen;
}

/* Adds block (which can be a list of blocks) to the queue, subject to
 * qio_flags.  Returns the length written on success or -1 on non-throwable
 * error.  Adjust qio_flags to control the value-added features!. */
static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
{
	ssize_t ret;
	bool was_unreadable;

	if (q->bypass) {
		ret = blocklen(b);
		(*q->bypass) (q->arg, b);
		return ret;
	}
	spin_lock_irqsave(&q->lock);
	was_unreadable = q->dlen == 0;
	if (q->state & Qclosed) {
		spin_unlock_irqsave(&q->lock);
		freeblist(b);
		if (!(qio_flags & QIO_CAN_ERR_SLEEP))
			return -1;
		if (q->err[0])
			error(EPIPE, q->err);
		else
			error(EPIPE, "connection closed");
	}
	if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
		/* drop overflow takes priority over regular non-blocking */
		if ((qio_flags & QIO_DROP_OVERFLOW)
		    || (q->state & Qdropoverflow)) {
			spin_unlock_irqsave(&q->lock);
			freeb(b);
			return -1;
		}
		/* People shouldn't set NON_BLOCK without CAN_ERR, but we can be
		 * nice and catch it. */
		if ((qio_flags & QIO_CAN_ERR_SLEEP)
		    && (qio_flags & QIO_NON_BLOCK)) {
			spin_unlock_irqsave(&q->lock);
			freeb(b);
			error(EAGAIN, "queue full");
		}
	}
	ret = enqueue_blist(q, b);
	QDEBUG checkb(b, "__qbwrite");
	spin_unlock_irqsave(&q->lock);
	/* TODO: not sure if the usage of a kick is mutually exclusive with a
	 * wakeup, meaning that actual users either want a kick or have
	 * qreaders. */
	if (q->kick && (was_unreadable || (q->state & Qkick)))
		q->kick(q->arg);
	if (was_unreadable) {
		/* Unlike the read side, there's no double-check to make sure
		 * the queue transitioned across an edge.  We know we added
		 * something, so that's enough.  We wake if the queue was empty.
		 * Both sides are the same, in that the condition for which we
		 * do the rendez_wakeup() is the same as the condition done for
		 * the rendez_sleep(). */
		rendez_wakeup(&q->rr);
		qwake_cb(q, FDTAP_FILT_READABLE);
	}
	/*
	 *  flow control, wait for queue to get below the limit
	 *  before allowing the process to continue and queue
	 *  more.  We do this here so that postnote can only
	 *  interrupt us after the data has been queued.  This
	 *  means that things like 9p flushes and ssl messages
	 *  will not be disrupted by software interrupts.
	 *
	 *  Note - this is moderately dangerous since a process
	 *  that keeps getting interrupted and rewriting will
	 *  queue infinite crud.
	 */
	if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
	    !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
		/* This is a racy peek at the q status.  If we accidentally
		 * block, our rendez will return.  The rendez's peak
		 * (qwriter_should_wake) is also racy w.r.t.  the q's spinlock
		 * (that lock protects writes, but not reads).
		 *
		 * Here's the deal: when holding the rendez lock, if we see the
		 * sleep condition, the consumer will wake us.  The condition
		 * will only ever be changed by the next qbread() (consumer,
		 * changes q->dlen).  That code will do a rendez wake, which
		 * will spin on the rendez lock, meaning it won't procede until
		 * we either see the new state (and return) or put ourselves on
		 * the rendez, and wake up.
		 *
		 * The pattern is one side writes mem, then signals.  Our side
		 * checks the signal, then reads the mem.  The goal is to not
		 * miss seeing the signal AND missing the memory write.  In this
		 * specific case, the signal is actually synchronous (the rendez
		 * lock) and not basic shared memory.
		 *
		 * Oh, and we spin in case we woke early and someone else filled
		 * the queue, mesa-style. */
		while (!qwriter_should_wake(q))
			rendez_sleep(&q->wr, qwriter_should_wake, q);
	}
	return ret;
}

/*
 *  add a block to a queue obeying flow control
 */
ssize_t qbwrite(struct queue *q, struct block *b)
{
	return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
}

ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
{
	return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
}

ssize_t qibwrite(struct queue *q, struct block *b)
{
	return __qbwrite(q, b, 0);
}

/* Helper, allocs a block and copies [from, from + len) into it.  Returns the
 * block on success, 0 on failure. */
static struct block *build_block(void *from, size_t len, int mem_flags)
{
	struct block *b;
	void *ext_buf;

	/* If len is small, we don't need to bother with the extra_data.  But
	 * until the whole stack can handle extd blocks, we'll use them
	 * unconditionally.  */
#ifdef CONFIG_BLOCK_EXTRAS
	/* allocb builds in 128 bytes of header space to all blocks, but this is
	 * only available via padblock (to the left).  we also need some space
	 * for pullupblock for some basic headers (like icmp) that get written
	 * in directly */
	b = block_alloc(64, mem_flags);
	if (!b)
		return 0;
	ext_buf = kmalloc(len, mem_flags);
	if (!ext_buf) {
		kfree(b);
		return 0;
	}
	memcpy(ext_buf, from, len);
	if (block_add_extd(b, 1, mem_flags)) {
		kfree(ext_buf);
		kfree(b);
		return 0;
	}
	b->extra_data[0].base = (uintptr_t)ext_buf;
	b->extra_data[0].off = 0;
	b->extra_data[0].len = len;
	b->extra_len += len;
#else
	b = block_alloc(len, mem_flags);
	if (!b)
		return 0;
	memmove(b->wp, from, len);
	b->wp += len;
#endif
	return b;
}

static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
                        int qio_flags)
{
	ERRSTACK(1);
	size_t n;
	volatile size_t sofar = 0;	/* volatile for the waserror */
	struct block *b;
	uint8_t *p = vp;
	void *ext_buf;

	/* Only some callers can throw.  Others might be in a context where
	 * waserror isn't safe. */
	if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
		/* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE)
		 * after some data has been sent should be treated as a partial
		 * write. */
		if (sofar)
			goto out_ok;
		nexterror();
	}
	do {
		n = len - sofar;
		/* This is 64K, the max amount per single block.  Still a good
		 * value? */
		if (n > Maxatomic)
			n = Maxatomic;
		b = build_block(p + sofar, n, mem_flags);
		if (!b)
			break;
		if (__qbwrite(q, b, qio_flags) < 0)
			break;
		sofar += n;
	} while ((sofar < len) && (q->state & Qmsg) == 0);
out_ok:
	if (qio_flags & QIO_CAN_ERR_SLEEP)
		poperror();
	return sofar;
}

ssize_t qwrite(struct queue *q, void *vp, int len)
{
	return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
}

ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
{
	return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
	                                      QIO_NON_BLOCK);
}

ssize_t qiwrite(struct queue *q, void *vp, int len)
{
	return __qwrite(q, vp, len, MEM_ATOMIC, 0);
}

/*
 *  be extremely careful when calling this,
 *  as there is no reference accounting
 */
void qfree(struct queue *q)
{
	qclose(q);
	kfree(q);
}

/*
 *  Mark a queue as closed.  No further IO is permitted.
 *  All blocks are released.
 */
void qclose(struct queue *q)
{
	struct block *bfirst;

	if (q == NULL)
		return;

	/* mark it */
	spin_lock_irqsave(&q->lock);
	q->state |= Qclosed;
	q->state &= ~Qdropoverflow;
	q->err[0] = 0;
	bfirst = q->bfirst;
	q->bfirst = 0;
	q->dlen = 0;
	spin_unlock_irqsave(&q->lock);

	/* free queued blocks */
	freeblist(bfirst);

	/* wake up readers/writers */
	rendez_wakeup(&q->rr);
	rendez_wakeup(&q->wr);
	qwake_cb(q, FDTAP_FILT_HANGUP);
}

/* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
 *
 * msg will be the errstr received by any waiters (qread, qbread, etc).  If
 * there is no message, which is what also happens during a natural qclose(),
 * those waiters will simply return 0.  qwriters will always error() on a
 * closed/hungup queue. */
void qhangup(struct queue *q, char *msg)
{
	/* mark it */
	spin_lock_irqsave(&q->lock);
	q->state |= Qclosed;
	if (msg == 0 || *msg == 0)
		q->err[0] = 0;
	else
		strlcpy(q->err, msg, ERRMAX);
	spin_unlock_irqsave(&q->lock);

	/* wake up readers/writers */
	rendez_wakeup(&q->rr);
	rendez_wakeup(&q->wr);
	qwake_cb(q, FDTAP_FILT_HANGUP);
}

/*
 *  return non-zero if the q is hungup
 */
int qisclosed(struct queue *q)
{
	return q->state & Qclosed;
}

/*
 *  mark a queue as no longer hung up.  resets the wake_cb.
 */
void qreopen(struct queue *q)
{
	spin_lock_irqsave(&q->lock);
	q->state &= ~Qclosed;
	q->eof = 0;
	q->limit = q->inilim;
	q->wake_cb = 0;
	q->wake_data = 0;
	spin_unlock_irqsave(&q->lock);
}

/*
 *  return bytes queued
 */
int qlen(struct queue *q)
{
	return q->dlen;
}

size_t q_bytes_read(struct queue *q)
{
	return q->bytes_read;
}

/*
 * return space remaining before flow control
 *
 *  This used to be
 *  q->len < q->limit/2
 *  but it slows down tcp too much for certain write sizes.
 *  I really don't understand it completely.  It may be
 *  due to the queue draining so fast that the transmission
 *  stalls waiting for the app to produce more data.  - presotto
 *
 *  q->len was the amount of bytes, which is no longer used.  we now use
 *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
 */
int qwindow(struct queue *q)
{
	int l;

	l = q->limit - q->dlen;
	if (l < 0)
		l = 0;
	return l;
}

/*
 *  return true if we can read without blocking
 */
int qcanread(struct queue *q)
{
	return q->bfirst != 0;
}

/*
 *  change queue limit
 */
void qsetlimit(struct queue *q, size_t limit)
{
	bool was_writable = qwritable(q);

	q->limit = limit;
	if (!was_writable && qwritable(q)) {
		rendez_wakeup(&q->wr);
		qwake_cb(q, FDTAP_FILT_WRITABLE);
	}
}

size_t qgetlimit(struct queue *q)
{
	return q->limit;
}

/*
 *  set whether writes drop overflowing blocks, or if we sleep
 */
void qdropoverflow(struct queue *q, bool onoff)
{
	spin_lock_irqsave(&q->lock);
	if (onoff)
		q->state |= Qdropoverflow;
	else
		q->state &= ~Qdropoverflow;
	spin_unlock_irqsave(&q->lock);
}

/* Be careful: this can affect concurrent reads/writes and code that might have
 * built-in expectations of the q's type. */
void q_toggle_qmsg(struct queue *q, bool onoff)
{
	spin_lock_irqsave(&q->lock);
	if (onoff)
		q->state |= Qmsg;
	else
		q->state &= ~Qmsg;
	spin_unlock_irqsave(&q->lock);
}

/* Be careful: this can affect concurrent reads/writes and code that might have
 * built-in expectations of the q's type. */
void q_toggle_qcoalesce(struct queue *q, bool onoff)
{
	spin_lock_irqsave(&q->lock);
	if (onoff)
		q->state |= Qcoalesce;
	else
		q->state &= ~Qcoalesce;
	spin_unlock_irqsave(&q->lock);
}

/*
 *  flush the output queue
 */
void qflush(struct queue *q)
{
	struct block *bfirst;

	/* mark it */
	spin_lock_irqsave(&q->lock);
	bfirst = q->bfirst;
	q->bfirst = 0;
	q->dlen = 0;
	spin_unlock_irqsave(&q->lock);

	/* free queued blocks */
	freeblist(bfirst);

	/* wake up writers */
	rendez_wakeup(&q->wr);
	qwake_cb(q, FDTAP_FILT_WRITABLE);
}

int qfull(struct queue *q)
{
	return !qwritable(q);
}

int qstate(struct queue *q)
{
	return q->state;
}

void qdump(struct queue *q)
{
	if (q)
		printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
			   q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
}

/* On certain wakeup events, qio will call func(q, data, filter), where filter
 * marks the type of wakeup event (flags from FDTAP).
 *
 * There's no sync protection.  If you change the CB while the qio is running,
 * you might get a CB with the data or func from a previous set_wake_cb.  You
 * should set this once per queue and forget it.
 *
 * You can remove the CB by passing in 0 for the func.  Alternatively, you can
 * just make sure that the func(data) pair are valid until the queue is freed or
 * reopened. */
void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
{
	q->wake_data = data;
	wmb();	/* if we see func, we'll also see the data for it */
	q->wake_cb = func;
}

/* Helper for detecting whether we'll block on a read at this instant. */
bool qreadable(struct queue *q)
{
	return qlen(q) > 0;
}

/* Helper for detecting whether we'll block on a write at this instant. */
bool qwritable(struct queue *q)
{
	return !q->limit || qwindow(q) > 0;
}
