qio: Remove q->len q->dlen is the amount of data bytes in the queue: the bytes that our clients are adding and removing. q->len was the amount of total bytes in the queue, including metadata, block structs, and anything else. If you had a block with 0 data bytes in the queue, the size of the block itself counted against q->len. Note that the qlen() function actually returns q->dlen. The only time we really used q->len was for flow control, and that's where it caused problems. I had a pipe where the reader didn't wake a writer. The reader saw 'was_unwritable', then popped a block of length 0, and did a retry (this was the Qcoalesce case). However, popping that block made the queue writable again, which was unexpected since the reader got nothing. Instead of mucking around with q->len and q->dlen any further, I just yanked q->len completely. It's not clear that we even wanted q->len, and it's been the source of bugs and confusion for a while now. Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
diff --git a/kern/src/ns/qio.c b/kern/src/ns/qio.c index 8568990..07c58c0 100644 --- a/kern/src/ns/qio.c +++ b/kern/src/ns/qio.c
@@ -71,7 +71,6 @@ struct block *bfirst; /* buffer */ struct block *blast; - int len; /* bytes allocated to queue */ int dlen; /* data bytes in queue */ int limit; /* max bytes in queue */ int inilim; /* initial limit */ @@ -610,7 +609,6 @@ { struct block *b = q->bfirst; - q->len -= BALLOC(b); // XXX all usages of q->len with extra_data are fucked q->dlen -= BLEN(b); q->bfirst = b->next; b->next = 0; @@ -630,7 +628,6 @@ static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt) { b->extra_len -= amt; - q->len -= amt; q->dlen -= amt; } @@ -1257,7 +1254,6 @@ q->blast->next = b; else q->bfirst = b; - q->len += blockalloclen(b); q->dlen += blocklen(b); while (b->next) b = b->next; @@ -1396,7 +1392,6 @@ if (q->bfirst == NULL) q->blast = b; q->bfirst = b; - q->len += BALLOC(b); q->dlen += BLEN(b); } @@ -1445,21 +1440,18 @@ /* Helper: enqueues a list of blocks to a queue. Returns the total length. */ static size_t enqueue_blist(struct queue *q, struct block *b) { - size_t len, dlen; + size_t dlen; if (q->bfirst) q->blast->next = b; else q->bfirst = b; - len = BALLOC(b); dlen = BLEN(b); while (b->next) { b = b->next; - len += BALLOC(b); dlen += BLEN(b); } q->blast = b; - q->len += len; q->dlen += dlen; return dlen; } @@ -1478,7 +1470,7 @@ return ret; } spin_lock_irqsave(&q->lock); - was_unreadable = q->len == 0; + was_unreadable = q->dlen == 0; if (q->state & Qclosed) { spin_unlock_irqsave(&q->lock); freeblist(b); @@ -1489,7 +1481,7 @@ else error(EPIPE, "connection closed"); } - if ((qio_flags & QIO_LIMIT) && (q->len >= q->limit)) { + if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) { /* drop overflow takes priority over regular non-blocking */ if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) { spin_unlock_irqsave(&q->lock); @@ -1540,7 +1532,7 @@ * * Here's the deal: when holding the rendez lock, if we see the sleep * condition, the consumer will wake us. The condition will only ever - * be changed by the next qbread() (consumer, changes q->len). That + * be changed by the next qbread() (consumer, changes q->dlen). That * code will do a rendez wake, which will spin on the rendez lock, * meaning it won't procede until we either see the new state (and * return) or put ourselves on the rendez, and wake up. @@ -1688,7 +1680,6 @@ q->err[0] = 0; bfirst = q->bfirst; q->bfirst = 0; - q->len = 0; q->dlen = 0; spin_unlock_irqsave(&q->lock); @@ -1763,12 +1754,15 @@ * I really don't understand it completely. It may be * due to the queue draining so fast that the transmission * stalls waiting for the app to produce more data. - presotto + * + * q->len was the amount of bytes, which is no longer used. we now use + * q->dlen, the amount of usable data. a.k.a. qlen()... - brho */ int qwindow(struct queue *q) { int l; - l = q->limit - q->len; + l = q->limit - q->dlen; if (l < 0) l = 0; return l; @@ -1838,7 +1832,6 @@ spin_lock_irqsave(&q->lock); bfirst = q->bfirst; q->bfirst = 0; - q->len = 0; q->dlen = 0; spin_unlock_irqsave(&q->lock); @@ -1852,7 +1845,7 @@ int qfull(struct queue *q) { - return q->len >= q->limit; + return q->dlen >= q->limit; } int qstate(struct queue *q) @@ -1863,8 +1856,8 @@ void qdump(struct queue *q) { if (q) - printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n", - q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state); + printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n", + q, q->bfirst, q->blast, q->dlen, q->limit, q->state); } /* On certain wakeup events, qio will call func(q, data, filter), where filter