blob: 1e3d8986c68e5abc4dc576f218a6ca803ea1eba3 [file] [log] [blame]
#include <vfs.h>
#include <kfs.h>
#include <slab.h>
#include <kmalloc.h>
#include <kref.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <error.h>
#include <cpio.h>
#include <pmap.h>
#include <smp.h>
static uint32_t padblockcnt;
static uint32_t concatblockcnt;
static uint32_t pullupblockcnt;
static uint32_t copyblockcnt;
static uint32_t consumecnt;
static uint32_t producecnt;
static uint32_t qcopycnt;
static int debugging;
#define QDEBUG if(0)
/*
* IO queues
*/
enum
{
Maxatomic = 64*1024,
};
unsigned int qiomaxatomic = Maxatomic;
void
ixsummary(void)
{
debugging ^= 1;
iallocsummary();
printd("pad %lud, concat %lud, pullup %lud, copy %lud\n",
padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
printd("consume %lud, produce %lud, qcopy %lud\n",
consumecnt, producecnt, qcopycnt);
}
/*
* free a list of blocks
*/
void
freeblist(struct block *b)
{
struct block *next;
for(; b != 0; b = next){
next = b->next;
b->next = 0;
freeb(b);
}
}
/*
* pad a block to the front (or the back if size is negative)
*/
struct block*
padblock(struct block *bp, int size)
{
int n;
struct block *nbp;
QDEBUG checkb(bp, "padblock 1");
if(size >= 0){
if(bp->rp - bp->base >= size){
bp->rp -= size;
return bp;
}
if(bp->next)
panic("padblock %#p", getcallerpc(&bp));
n = BLEN(bp);
padblockcnt++;
nbp = allocb(size+n);
nbp->rp += size;
nbp->wp = nbp->rp;
memmove(nbp->wp, bp->rp, n);
nbp->wp += n;
freeb(bp);
nbp->rp -= size;
} else {
size = -size;
if(bp->next)
panic("padblock %#p", getcallerpc(&bp));
if(bp->lim - bp->wp >= size)
return bp;
n = BLEN(bp);
padblockcnt++;
nbp = allocb(size+n);
memmove(nbp->wp, bp->rp, n);
nbp->wp += n;
freeb(bp);
}
QDEBUG checkb(nbp, "padblock 1");
return nbp;
}
/*
* return count of bytes in a string of blocks
*/
int
blocklen(struct block *bp)
{
int len;
len = 0;
while(bp) {
len += BLEN(bp);
bp = bp->next;
}
return len;
}
/*
* return count of space in blocks
*/
int
blockalloclen(struct block *bp)
{
int len;
len = 0;
while(bp) {
len += BALLOC(bp);
bp = bp->next;
}
return len;
}
/*
* copy the string of blocks into
* a single block and free the string
*/
struct block*
concatblock(struct block *bp)
{
int len;
struct block *nb, *f;
if(bp->next == 0)
return bp;
nb = allocb(blocklen(bp));
for(f = bp; f; f = f->next) {
len = BLEN(f);
memmove(nb->wp, f->rp, len);
nb->wp += len;
}
concatblockcnt += BLEN(nb);
freeblist(bp);
QDEBUG checkb(nb, "concatblock 1");
return nb;
}
/*
* make sure the first block has at least n bytes
*/
struct block*
pullupblock(struct block *bp, int n)
{
int i;
struct block *nbp;
/*
* this should almost always be true, it's
* just to avoid every caller checking.
*/
if(BLEN(bp) >= n)
return bp;
/*
* if not enough room in the first block,
* add another to the front of the list.
*/
if(bp->lim - bp->rp < n){
nbp = allocb(n);
nbp->next = bp;
bp = nbp;
}
/*
* copy bytes from the trailing blocks into the first
*/
n -= BLEN(bp);
while((nbp = bp->next)){
i = BLEN(nbp);
if(i > n) {
memmove(bp->wp, nbp->rp, n);
pullupblockcnt++;
bp->wp += n;
nbp->rp += n;
QDEBUG checkb(bp, "pullupblock 1");
return bp;
} else {
/* shouldn't happen but why crash if it does */
if(i < 0){
printd("pullupblock -ve length, from %#p\n",
getcallerpc(&bp));
i = 0;
}
memmove(bp->wp, nbp->rp, i);
pullupblockcnt++;
bp->wp += i;
bp->next = nbp->next;
nbp->next = 0;
freeb(nbp);
n -= i;
if(n == 0){
QDEBUG checkb(bp, "pullupblock 2");
return bp;
}
}
}
freeb(bp);
return 0;
}
/*
* make sure the first block has at least n bytes
*/
struct block*
pullupqueue(struct queue *q, int n)
{
struct block *b;
if(BLEN(q->bfirst) >= n)
return q->bfirst;
q->bfirst = pullupblock(q->bfirst, n);
for(b = q->bfirst; b != NULL && b->next != NULL; b = b->next)
;
q->blast = b;
return q->bfirst;
}
/*
* trim to len bytes starting at offset
*/
struct block *
trimblock(struct block *bp, int offset, int len)
{
long l;
struct block *nb, *startb;
QDEBUG checkb(bp, "trimblock 1");
if(blocklen(bp) < offset+len) {
freeblist(bp);
return NULL;
}
while((l = BLEN(bp)) < offset) {
offset -= l;
nb = bp->next;
bp->next = NULL;
freeb(bp);
bp = nb;
}
startb = bp;
bp->rp += offset;
while((l = BLEN(bp)) < len) {
len -= l;
bp = bp->next;
}
bp->wp -= (BLEN(bp) - len);
if(bp->next) {
freeblist(bp->next);
bp->next = NULL;
}
return startb;
}
/*
* copy 'count' bytes into a new block
*/
struct block*
copyblock(struct block *bp, int count)
{
int l;
struct block *nbp;
QDEBUG checkb(bp, "copyblock 0");
if(bp->flag & BINTR){
nbp = iallocb(count);
if(nbp == NULL)
return NULL;
}else
nbp = allocb(count);
for(; count > 0 && bp != 0; bp = bp->next){
l = BLEN(bp);
if(l > count)
l = count;
memmove(nbp->wp, bp->rp, l);
nbp->wp += l;
count -= l;
}
if(count > 0){
memset(nbp->wp, 0, count);
nbp->wp += count;
}
copyblockcnt++;
QDEBUG checkb(nbp, "copyblock 1");
return nbp;
}
struct block*
adjustblock(struct block* bp, int len)
{
int n;
struct block *nbp;
if(len < 0){
freeb(bp);
return NULL;
}
if(bp->rp+len > bp->lim){
nbp = copyblock(bp, len);
freeblist(bp);
QDEBUG checkb(nbp, "adjustblock 1");
return nbp;
}
n = BLEN(bp);
if(len > n)
memset(bp->wp, 0, len-n);
bp->wp = bp->rp+len;
QDEBUG checkb(bp, "adjustblock 2");
return bp;
}
/*
* throw away up to count bytes from a
* list of blocks. Return count of bytes
* thrown away.
*/
int
pullblock(struct block **bph, int count)
{
struct block *bp;
int n, bytes;
bytes = 0;
if(bph == NULL)
return 0;
while(*bph != NULL && count != 0) {
bp = *bph;
n = BLEN(bp);
if(count < n)
n = count;
bytes += n;
count -= n;
bp->rp += n;
QDEBUG checkb(bp, "pullblock ");
if(BLEN(bp) == 0) {
*bph = bp->next;
bp->next = NULL;
freeb(bp);
}
}
return bytes;
}
/*
* get next block from a queue, return null if nothing there
*/
struct block*
qget(struct queue *q)
{
int dowakeup;
struct block *b;
/* sync with qwrite */
ilock(&q->lock);
b = q->bfirst;
if(b == NULL){
q->state |= Qstarve;
iunlock(&q->lock);
return NULL;
}
q->bfirst = b->next;
b->next = 0;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
QDEBUG checkb(b, "qget");
/* if writer flow controlled, restart */
if((q->state & Qflow) && q->dlen < q->limit/2){
q->state &= ~Qflow;
dowakeup = 1;
} else
dowakeup = 0;
iunlock(&q->lock);
if(dowakeup)
rendez_wakeup(&q->wr);
return b;
}
/*
* throw away the next 'len' bytes in the queue
*/
int
qdiscard(struct queue *q, int len)
{
struct block *b;
int dowakeup, n, sofar;
ilock(&q->lock);
for(sofar = 0; sofar < len; sofar += n){
b = q->bfirst;
if(b == NULL)
break;
QDEBUG checkb(b, "qdiscard");
n = BLEN(b);
if(n <= len - sofar){
q->bfirst = b->next;
b->next = 0;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
freeb(b);
} else {
n = len - sofar;
b->rp += n;
q->dlen -= n;
}
}
/*
* if writer flow controlled, restart
*
* This used to be
* q->len < q->limit/2
* but it slows down tcp too much for certain write sizes.
* I really don't understand it completely. It may be
* due to the queue draining so fast that the transmission
* stalls waiting for the app to produce more data. - presotto
*
* changed back from q->len < q->limit for reno tcp. - jmk
*
* changed from q->len to q->dlen. - brho
*/
if((q->state & Qflow) && q->dlen < q->limit/2){
q->state &= ~Qflow;
dowakeup = 1;
} else
dowakeup = 0;
iunlock(&q->lock);
if(dowakeup)
rendez_wakeup(&q->wr);
return sofar;
}
/*
* Interrupt level copy out of a queue, return # bytes copied.
*/
int
qconsume(struct queue *q, void *vp, int len)
{
struct block *b;
int n, dowakeup;
uint8_t *p = vp;
struct block *tofree = NULL;
/* sync with qwrite */
ilock(&q->lock);
for(;;) {
b = q->bfirst;
if(b == 0){
q->state |= Qstarve;
iunlock(&q->lock);
return -1;
}
QDEBUG checkb(b, "qconsume 1");
n = BLEN(b);
if(n > 0)
break;
q->bfirst = b->next;
q->len -= BALLOC(b);
/* remember to free this */
b->next = tofree;
tofree = b;
};
if(n < len)
len = n;
memmove(p, b->rp, len);
consumecnt += n;
b->rp += len;
q->dlen -= len;
/* discard the block if we're done with it */
if((q->state & Qmsg) || len == n){
q->bfirst = b->next;
b->next = 0;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
/* remember to free this */
b->next = tofree;
tofree = b;
}
/* if writer flow controlled, restart */
if((q->state & Qflow) && q->dlen < q->limit/2){
q->state &= ~Qflow;
dowakeup = 1;
} else
dowakeup = 0;
iunlock(&q->lock);
if(dowakeup)
rendez_wakeup(&q->wr);
if(tofree != NULL)
freeblist(tofree);
return len;
}
int
qpass(struct queue *q, struct block *b)
{
int dlen, len, dowakeup;
/* sync with qread */
dowakeup = 0;
ilock(&q->lock);
if(q->dlen >= q->limit){
freeblist(b);
iunlock(&q->lock);
return -1;
}
if(q->state & Qclosed){
len = BALLOC(b);
freeblist(b);
iunlock(&q->lock);
return len;
}
/* add buffer to queue */
if(q->bfirst)
q->blast->next = b;
else
q->bfirst = b;
len = BALLOC(b);
dlen = BLEN(b);
QDEBUG checkb(b, "qpass");
while(b->next){
b = b->next;
QDEBUG checkb(b, "qpass");
len += BALLOC(b);
dlen += BLEN(b);
}
q->blast = b;
q->len += len;
q->dlen += dlen;
if(q->dlen >= q->limit/2)
q->state |= Qflow;
if(q->state & Qstarve){
q->state &= ~Qstarve;
dowakeup = 1;
}
iunlock(&q->lock);
if(dowakeup)
rendez_wakeup(&q->rr);
return len;
}
int
qpassnolim(struct queue *q, struct block *b)
{
int dlen, len, dowakeup;
/* sync with qread */
dowakeup = 0;
ilock(&q->lock);
if(q->state & Qclosed){
len = BALLOC(b);
freeblist(b);
iunlock(&q->lock);
return len;
}
/* add buffer to queue */
if(q->bfirst)
q->blast->next = b;
else
q->bfirst = b;
len = BALLOC(b);
dlen = BLEN(b);
QDEBUG checkb(b, "qpass");
while(b->next){
b = b->next;
QDEBUG checkb(b, "qpass");
len += BALLOC(b);
dlen += BLEN(b);
}
q->blast = b;
q->len += len;
q->dlen += dlen;
if(q->dlen >= q->limit/2)
q->state |= Qflow;
if(q->state & Qstarve){
q->state &= ~Qstarve;
dowakeup = 1;
}
iunlock(&q->lock);
if(dowakeup)
rendez_wakeup(&q->rr);
return len;
}
/*
* if the allocated space is way out of line with the used
* space, reallocate to a smaller block
*/
struct block*
packblock(struct block *bp)
{
struct block **l, *nbp;
int n;
for(l = &bp; *l; l = &(*l)->next){
nbp = *l;
n = BLEN(nbp);
if((n<<2) < BALLOC(nbp)){
*l = allocb(n);
memmove((*l)->wp, nbp->rp, n);
(*l)->wp += n;
(*l)->next = nbp->next;
freeb(nbp);
}
}
return bp;
}
int
qproduce(struct queue *q, void *vp, int len)
{
struct block *b;
int dowakeup;
uint8_t *p = vp;
/* sync with qread */
dowakeup = 0;
ilock(&q->lock);
/* no waiting receivers, room in buffer? */
if(q->dlen >= q->limit){
q->state |= Qflow;
iunlock(&q->lock);
return -1;
}
/* save in buffer */
b = iallocb(len);
if(b == 0){
iunlock(&q->lock);
return 0;
}
memmove(b->wp, p, len);
producecnt += len;
b->wp += len;
if(q->bfirst)
q->blast->next = b;
else
q->bfirst = b;
q->blast = b;
/* b->next = 0; done by iallocb() */
q->len += BALLOC(b);
q->dlen += BLEN(b);
QDEBUG checkb(b, "qproduce");
if(q->state & Qstarve){
q->state &= ~Qstarve;
dowakeup = 1;
}
if(q->dlen >= q->limit)
q->state |= Qflow;
iunlock(&q->lock);
if(dowakeup)
rendez_wakeup(&q->rr);
return len;
}
/*
* copy from offset in the queue
*/
struct block*
qcopy(struct queue *q, int len, uint32_t offset)
{
int sofar;
int n;
struct block *b, *nb;
uint8_t *p;
nb = allocb(len);
ilock(&q->lock);
/* go to offset */
b = q->bfirst;
for(sofar = 0; ; sofar += n){
if(b == NULL){
iunlock(&q->lock);
return nb;
}
n = BLEN(b);
if(sofar + n > offset){
p = b->rp + offset - sofar;
n -= offset - sofar;
break;
}
QDEBUG checkb(b, "qcopy");
b = b->next;
}
/* copy bytes from there */
for(sofar = 0; sofar < len;){
if(n > len - sofar)
n = len - sofar;
memmove(nb->wp, p, n);
qcopycnt += n;
sofar += n;
nb->wp += n;
b = b->next;
if(b == NULL)
break;
n = BLEN(b);
p = b->rp;
}
iunlock(&q->lock);
return nb;
}
static void qcommon_init(struct queue *q)
{
spinlock_init_irqsave(&q->lock);
rendez_init(&q->rr);
rendez_init(&q->wr);
qlock_init(&q->rlock);
qlock_init(&q->wlock);
}
/*
* called by non-interrupt code
*/
struct queue*
qopen(int limit, int msg, void (*kick)(void*), void *arg)
{
struct queue *q;
q = kzmalloc(sizeof(struct queue), 0);
if(q == 0)
return 0;
q->limit = q->inilim = limit;
q->kick = kick;
q->arg = arg;
q->state = msg;
q->state |= Qstarve;
q->eof = 0;
q->noblock = 0;
qcommon_init(q);
return q;
}
/* open a queue to be bypassed */
struct queue*
qbypass(void (*bypass)(void*, struct block*), void *arg)
{
struct queue *q;
q = kzmalloc(sizeof(struct queue), 0);
if(q == 0)
return 0;
q->limit = 0;
q->arg = arg;
q->bypass = bypass;
q->state = 0;
qcommon_init(q);
return q;
}
static int
notempty(void *a)
{
struct queue *q = a;
return (q->state & Qclosed) || q->bfirst != 0;
}
/*
* wait for the queue to be non-empty or closed.
* called with q ilocked.
*/
static int
qwait(struct queue *q)
{
/* wait for data */
for(;;){
if(q->bfirst != NULL)
break;
if(q->state & Qclosed){
if(++q->eof > 3)
return -1;
if(*q->err && strcmp(q->err, Ehungup) != 0)
return -1;
return 0;
}
q->state |= Qstarve; /* flag requesting producer to wake me */
iunlock(&q->lock);
rendez_sleep(&q->rr, notempty, q);
ilock(&q->lock);
}
return 1;
}
/*
* add a block list to a queue
*/
void
qaddlist(struct queue *q, struct block *b)
{
/* queue the block */
if(q->bfirst)
q->blast->next = b;
else
q->bfirst = b;
q->len += blockalloclen(b);
q->dlen += blocklen(b);
while(b->next)
b = b->next;
q->blast = b;
}
/*
* called with q ilocked
*/
struct block*
qremove(struct queue *q)
{
struct block *b;
b = q->bfirst;
if(b == NULL)
return NULL;
q->bfirst = b->next;
b->next = NULL;
q->dlen -= BLEN(b);
q->len -= BALLOC(b);
QDEBUG checkb(b, "qremove");
return b;
}
/*
* copy the contents of a string of blocks into
* memory. emptied blocks are freed. return
* pointer to first unconsumed block.
*/
struct block*
bl2mem(uint8_t *p, struct block *b, int n)
{
int i;
struct block *next;
for(; b != NULL; b = next){
i = BLEN(b);
if(i > n){
memmove(p, b->rp, n);
b->rp += n;
return b;
}
memmove(p, b->rp, i);
n -= i;
p += i;
b->rp += i;
next = b->next;
freeb(b);
}
return NULL;
}
/*
* copy the contents of memory into a string of blocks.
* return NULL on error.
*/
struct block*
mem2bl(uint8_t *p, int len)
{
ERRSTACK(2);
int n;
struct block *b, *first, **l;
first = NULL;
l = &first;
if(waserror()){
freeblist(first);
nexterror();
}
do {
n = len;
if(n > Maxatomic)
n = Maxatomic;
*l = b = allocb(n);
memmove(b->wp, p, n);
b->wp += n;
p += n;
len -= n;
l = &b->next;
} while(len > 0);
poperror();
return first;
}
/*
* put a block back to the front of the queue
* called with q ilocked
*/
void
qputback(struct queue *q, struct block *b)
{
b->next = q->bfirst;
if(q->bfirst == NULL)
q->blast = b;
q->bfirst = b;
q->len += BALLOC(b);
q->dlen += BLEN(b);
}
/*
* flow control, get producer going again
* called with q ilocked
*/
static void
qwakeup_iunlock(struct queue *q)
{
int dowakeup;
/* if writer flow controlled, restart */
if((q->state & Qflow) && q->dlen < q->limit/2){
q->state &= ~Qflow;
dowakeup = 1;
}
else
dowakeup = 0;
iunlock(&q->lock);
/* wakeup flow controlled writers */
if(dowakeup){
if(q->kick)
q->kick(q->arg);
rendez_wakeup(&q->wr);
}
}
/*
* get next block from a queue (up to a limit)
*/
struct block*
qbread(struct queue *q, int len)
{
ERRSTACK(2);
struct block *b, *nb;
int n;
qlock(&q->rlock);
if(waserror()){
qunlock(&q->rlock);
nexterror();
}
ilock(&q->lock);
switch(qwait(q)){
case 0:
/* queue closed */
iunlock(&q->lock);
qunlock(&q->rlock);
poperror();
return NULL;
case -1:
/* multiple reads on a closed queue */
iunlock(&q->lock);
error(q->err);
}
/* if we get here, there's at least one block in the queue */
b = qremove(q);
n = BLEN(b);
/* split block if it's too big and this is not a message queue */
nb = b;
if(n > len){
if((q->state&Qmsg) == 0){
n -= len;
b = allocb(n);
memmove(b->wp, nb->rp+len, n);
b->wp += n;
qputback(q, b);
}
nb->wp = nb->rp + len;
}
/* restart producer */
qwakeup_iunlock(q);
poperror();
qunlock(&q->rlock);
return nb;
}
/*
* read a queue. if no data is queued, post a Block
* and wait on its Rendez.
*/
long
qread(struct queue *q, void *vp, int len)
{
ERRSTACK(2);
struct block *b, *first, **l;
int blen, n;
qlock(&q->rlock);
if(waserror()){
qunlock(&q->rlock);
nexterror();
}
ilock(&q->lock);
again:
switch(qwait(q)){
case 0:
/* queue closed */
iunlock(&q->lock);
qunlock(&q->rlock);
poperror();
return 0;
case -1:
/* multiple reads on a closed queue */
iunlock(&q->lock);
error(q->err);
}
/* if we get here, there's at least one block in the queue */
if(q->state & Qcoalesce){
/* when coalescing, 0 length blocks just go away */
b = q->bfirst;
if(BLEN(b) <= 0){
freeb(qremove(q));
goto again;
}
/* grab the first block plus as many
* following blocks as will completely
* fit in the read.
*/
n = 0;
l = &first;
blen = BLEN(b);
for(;;) {
*l = qremove(q);
l = &b->next;
n += blen;
b = q->bfirst;
if(b == NULL)
break;
blen = BLEN(b);
if(n+blen > len)
break;
}
} else {
first = qremove(q);
n = BLEN(first);
}
/* copy to user space outside of the ilock */
iunlock(&q->lock);
b = bl2mem(vp, first, len);
ilock(&q->lock);
/* take care of any left over partial block */
if(b != NULL){
n -= BLEN(b);
if(q->state & Qmsg)
freeb(b);
else
qputback(q, b);
}
/* restart producer */
qwakeup_iunlock(q);
poperror();
qunlock(&q->rlock);
return n;
}
static int
qnotfull(void *a)
{
struct queue *q = a;
return q->dlen < q->limit || (q->state & Qclosed);
}
uint32_t noblockcnt;
/*
* add a block to a queue obeying flow control
*/
long
qbwrite(struct queue *q, struct block *b)
{
ERRSTACK(2);
int n, dowakeup;
n = BLEN(b);
if(q->bypass){
(*q->bypass)(q->arg, b);
return n;
}
dowakeup = 0;
qlock(&q->wlock);
if(waserror()){
if(b != NULL)
freeb(b);
qunlock(&q->wlock);
nexterror();
}
ilock(&q->lock);
/* give up if the queue is closed */
if(q->state & Qclosed){
iunlock(&q->lock);
error(q->err);
}
/* if nonblocking, don't queue over the limit */
if(q->dlen >= q->limit){
if(q->noblock){
iunlock(&q->lock);
freeb(b);
noblockcnt += n;
qunlock(&q->wlock);
poperror();
return n;
}
}
/* queue the block */
if(q->bfirst)
q->blast->next = b;
else
q->bfirst = b;
q->blast = b;
b->next = 0;
q->len += BALLOC(b);
q->dlen += n;
QDEBUG checkb(b, "qbwrite");
b = NULL;
/* make sure other end gets awakened */
if(q->state & Qstarve){
q->state &= ~Qstarve;
dowakeup = 1;
}
iunlock(&q->lock);
/* get output going again */
if(q->kick && (dowakeup || (q->state&Qkick)))
q->kick(q->arg);
/* wakeup anyone consuming at the other end */
if(dowakeup)
rendez_wakeup(&q->rr);
/*
* flow control, wait for queue to get below the limit
* before allowing the process to continue and queue
* more. We do this here so that postnote can only
* interrupt us after the data has been queued. This
* means that things like 9p flushes and ssl messages
* will not be disrupted by software interrupts.
*
* Note - this is moderately dangerous since a process
* that keeps getting interrupted and rewriting will
* queue infinite crud.
*/
for(;;){
if(q->noblock || qnotfull(q))
break;
ilock(&q->lock);
q->state |= Qflow;
iunlock(&q->lock);
rendez_sleep(&q->wr, qnotfull, q);
}
qunlock(&q->wlock);
poperror();
return n;
}
/*
* write to a queue. only Maxatomic bytes at a time is atomic.
*/
int
qwrite(struct queue *q, void *vp, int len)
{
ERRSTACK(1);
int n, sofar;
struct block *b;
uint8_t *p = vp;
//QDEBUG if(!islo()) printd("qwrite hi %#p\n", getcallerpc(&q));
sofar = 0;
do {
n = len-sofar;
if(n > Maxatomic)
n = Maxatomic;
b = allocb(n);
if(waserror()){
freeb(b);
nexterror();
}
memmove(b->wp, p+sofar, n);
poperror();
b->wp += n;
qbwrite(q, b);
sofar += n;
} while(sofar < len && (q->state & Qmsg) == 0);
return len;
}
/*
* used by print() to write to a queue. Since we may be splhi or not in
* a process, don't qlock. (brho: i think this means it might be called from
* IRQ context)
*
* this routine merges adjacent blocks if block n+1 will fit into
* the free space of block n.
*/
int
qiwrite(struct queue *q, void *vp, int len)
{
int n, sofar, dowakeup;
struct block *b;
uint8_t *p = vp;
dowakeup = 0;
sofar = 0;
do {
n = len-sofar;
if(n > Maxatomic)
n = Maxatomic;
b = iallocb(n);
if(b == NULL)
break;
memmove(b->wp, p+sofar, n);
b->wp += n;
ilock(&q->lock);
/* we use an artificially high limit for kernel prints since anything
* over the limit gets dropped
* Note: this has always been dlen, even before changing the limit
* checks from len to dlen. (brho)
*/
if(q->dlen >= 16*1024){
iunlock(&q->lock);
freeb(b);
break;
}
QDEBUG checkb(b, "qiwrite");
if(q->bfirst)
q->blast->next = b;
else
q->bfirst = b;
q->blast = b;
q->len += BALLOC(b);
q->dlen += n;
if(q->state & Qstarve){
q->state &= ~Qstarve;
dowakeup = 1;
}
iunlock(&q->lock);
if(dowakeup){
if(q->kick)
q->kick(q->arg);
rendez_wakeup(&q->rr);
}
sofar += n;
} while(sofar < len && (q->state & Qmsg) == 0);
return sofar;
}
/*
* be extremely careful when calling this,
* as there is no reference accounting
*/
void
qfree(struct queue *q)
{
qclose(q);
kfree(q);
}
/*
* Mark a queue as closed. No further IO is permitted.
* All blocks are released.
*/
void
qclose(struct queue *q)
{
struct block *bfirst;
if(q == NULL)
return;
/* mark it */
ilock(&q->lock);
q->state |= Qclosed;
q->state &= ~(Qflow|Qstarve);
strncpy(q->err, Ehungup, sizeof(q->err));
bfirst = q->bfirst;
q->bfirst = 0;
q->len = 0;
q->dlen = 0;
q->noblock = 0;
iunlock(&q->lock);
/* free queued blocks */
freeblist(bfirst);
/* wake up readers/writers */
rendez_wakeup(&q->rr);
rendez_wakeup(&q->wr);
}
/*
* Mark a queue as closed. Wakeup any readers. Don't remove queued
* blocks.
*/
void
qhangup(struct queue *q, char *msg)
{
/* mark it */
ilock(&q->lock);
q->state |= Qclosed;
if(msg == 0 || *msg == 0)
strncpy(q->err, Ehungup, sizeof(q->err));
else
strncpy(q->err, msg, ERRMAX-1);
iunlock(&q->lock);
/* wake up readers/writers */
rendez_wakeup(&q->rr);
rendez_wakeup(&q->wr);
}
/*
* return non-zero if the q is hungup
*/
int
qisclosed(struct queue *q)
{
return q->state & Qclosed;
}
/*
* mark a queue as no longer hung up
*/
void
qreopen(struct queue *q)
{
ilock(&q->lock);
q->state &= ~Qclosed;
q->state |= Qstarve;
q->eof = 0;
q->limit = q->inilim;
iunlock(&q->lock);
}
/*
* return bytes queued
*/
int
qlen(struct queue *q)
{
return q->dlen;
}
/*
* return space remaining before flow control
*/
int
qwindow(struct queue *q)
{
int l;
l = q->limit - q->dlen;
if(l < 0)
l = 0;
return l;
}
/*
* return true if we can read without blocking
*/
int
qcanread(struct queue *q)
{
return q->bfirst!=0;
}
/*
* change queue limit
*/
void
qsetlimit(struct queue *q, int limit)
{
q->limit = limit;
}
/*
* set blocking/nonblocking
*/
void
qnoblock(struct queue *q, int onoff)
{
q->noblock = onoff;
}
/*
* flush the output queue
*/
void
qflush(struct queue *q)
{
struct block *bfirst;
/* mark it */
ilock(&q->lock);
bfirst = q->bfirst;
q->bfirst = 0;
q->len = 0;
q->dlen = 0;
iunlock(&q->lock);
/* free queued blocks */
freeblist(bfirst);
/* wake up readers/writers */
rendez_wakeup(&q->wr);
}
int
qfull(struct queue *q)
{
return q->state & Qflow;
}
int
qstate(struct queue *q)
{
return q->state;
}