From 768a69b3cedfebde10825847e42f35ed4aee1856 Mon Sep 17 00:00:00 2001 From: Felix Fietkau Date: Sun, 21 Oct 2012 23:00:06 +0200 Subject: [PATCH] add ustream, an api for stream buffer management --- CMakeLists.txt | 2 +- ustream-fd.c | 138 ++++++++++++++++ ustream.c | 500 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ustream.h | 183 +++++++++++++++++++++ 4 files changed, 822 insertions(+), 1 deletion(-) create mode 100644 ustream-fd.c create mode 100644 ustream.c create mode 100644 ustream.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 5bed1e2..93ee787 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,7 +10,7 @@ IF(APPLE) LINK_DIRECTORIES(/opt/local/lib) ENDIF() -SET(SOURCES avl.c avl-cmp.c blob.c blobmsg.c uloop.c usock.c vlist.c) +SET(SOURCES avl.c avl-cmp.c blob.c blobmsg.c uloop.c usock.c ustream.c ustream-fd.c vlist.c) ADD_LIBRARY(ubox SHARED ${SOURCES}) diff --git a/ustream-fd.c b/ustream-fd.c new file mode 100644 index 0000000..c2e70db --- /dev/null +++ b/ustream-fd.c @@ -0,0 +1,138 @@ +/* + * ustream - library for stream buffer management + * + * Copyright (C) 2012 Felix Fietkau + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include +#include +#include +#include "ustream.h" + +static void ustream_fd_set_uloop(struct ustream *s) +{ + struct ustream_fd *sf = container_of(s, struct ustream_fd, stream); + struct ustream_buf *buf; + unsigned int flags = ULOOP_EDGE_TRIGGER; + + if (!s->read_blocked && !s->eof) + flags |= ULOOP_READ; + + buf = s->w.head; + if (buf && s->w.data_bytes && !s->write_error) + flags |= ULOOP_WRITE; + + uloop_fd_add(&sf->fd, flags); + + if (flags & ULOOP_READ) + sf->fd.cb(&sf->fd, ULOOP_READ); +} + +static void ustream_fd_read_pending(struct ustream_fd *sf, bool *update) +{ + struct ustream *s = &sf->stream; + int buflen = 0; + ssize_t len; + char *buf; + + do { + buf = ustream_reserve(s, 1, &buflen); + if (!buf) + break; + + len = read(sf->fd.fd, buf, buflen); + if (!len) { + sf->fd.eof = true; + return; + } + + if (len < 0) { + if (errno == EINTR) + continue; + + if (errno == EAGAIN) + return; + } + + ustream_fill_read(s, len); + } while (1); +} + +static int ustream_fd_write(struct ustream *s, const char *buf, int buflen, bool more) +{ + struct ustream_fd *sf = container_of(s, struct ustream_fd, stream); + ssize_t len; + + if (!buflen) + return 0; + +retry: + len = write(sf->fd.fd, buf, buflen); + if (!len) + goto retry; + + if (len < 0) { + if (errno == EINTR) + goto retry; + + if (errno == EAGAIN || errno == EWOULDBLOCK) + return 0; + } + + return len; +} + +static void ustream_uloop_cb(struct uloop_fd *fd, unsigned int events) +{ + struct ustream_fd *sf = container_of(fd, struct ustream_fd, fd); + struct ustream *s = &sf->stream; + bool update = false; + + if (events & ULOOP_READ) + ustream_fd_read_pending(sf, &update); + + if (events & ULOOP_WRITE) { + if (ustream_write_pending(s)) + ustream_fd_set_uloop(s); + } + + if (!s->eof && fd->eof) { + s->eof = true; + ustream_fd_set_uloop(s); + ustream_state_change(s); + } +} + + +static void ustream_fd_free(struct ustream *s) +{ + struct ustream_fd *sf = container_of(s, struct ustream_fd, stream); + + uloop_fd_delete(&sf->fd); +} + +void ustream_fd_init(struct ustream_fd *sf, int fd) +{ + struct ustream *s = &sf->stream; + + ustream_init_defaults(s); + + sf->fd.fd = fd; + sf->fd.cb = ustream_uloop_cb; + s->set_read_blocked = ustream_fd_set_uloop; + s->write = ustream_fd_write; + s->free = ustream_fd_free; + ustream_fd_set_uloop(s); +} diff --git a/ustream.c b/ustream.c new file mode 100644 index 0000000..a58069d --- /dev/null +++ b/ustream.c @@ -0,0 +1,500 @@ +/* + * ustream - library for stream buffer management + * + * Copyright (C) 2012 Felix Fietkau + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include +#include +#include +#include +#include + +#include "ustream.h" + +void ustream_init_buf(struct ustream_buf *buf, int len) +{ + if (!len) + abort(); + + memset(buf, 0, sizeof(*buf)); + buf->data = buf->tail = buf->head; + buf->end = buf->head + len; + *buf->head = 0; +} + +static void ustream_add_buf(struct ustream_buf_list *l, struct ustream_buf *buf) +{ + if (!l->tail) + l->head = buf; + else + l->tail->next = buf; + + buf->next = NULL; + l->tail = buf; + if (!l->data_tail) + l->data_tail = l->head; +} + +static bool ustream_can_alloc(struct ustream_buf_list *l) +{ + if (l->max_buffers <= 0) + return true; + + return (l->buffers < l->max_buffers); +} + +static int ustream_alloc_default(struct ustream *s, struct ustream_buf_list *l) +{ + struct ustream_buf *buf; + + if (!ustream_can_alloc(l)) + return -1; + + buf = malloc(sizeof(*buf) + l->buffer_len + s->string_data); + ustream_init_buf(buf, l->buffer_len); + ustream_add_buf(l, buf); + + return 0; +} + +static void ustream_free_buffers(struct ustream_buf_list *l) +{ + struct ustream_buf *buf = l->head; + + while (buf) { + struct ustream_buf *next = buf->next; + + free(buf); + buf = next; + } + l->head = NULL; + l->tail = NULL; + l->data_tail = NULL; +} + +void ustream_free(struct ustream *s) +{ + if (s->free) + s->free(s); + + uloop_timeout_cancel(&s->state_change); + ustream_free_buffers(&s->r); + ustream_free_buffers(&s->w); + s->write_error = false; + s->eof = false; + s->read_blocked = 0; +} + +static void ustream_state_change_cb(struct uloop_timeout *t) +{ + struct ustream *s = container_of(t, struct ustream, state_change); + + if (s->write_error) + ustream_free_buffers(&s->w); + s->notify_state(s); +} + +void ustream_init_defaults(struct ustream *s) +{ +#define DEFAULT_SET(_f, _default) \ + do { \ + if (!_f) \ + _f = _default; \ + } while(0) + + DEFAULT_SET(s->r.alloc, ustream_alloc_default); + DEFAULT_SET(s->w.alloc, ustream_alloc_default); + + DEFAULT_SET(s->r.min_buffers, 1); + DEFAULT_SET(s->r.max_buffers, 1); + DEFAULT_SET(s->r.buffer_len, 4096); + + DEFAULT_SET(s->w.min_buffers, 2); + DEFAULT_SET(s->w.max_buffers, -1); + DEFAULT_SET(s->w.buffer_len, 256); + +#undef DEFAULT_SET + + s->state_change.cb = ustream_state_change_cb; +} + +static bool ustream_should_move(struct ustream_buf_list *l, struct ustream_buf *buf, int len) +{ + int maxlen; + int offset; + + if (buf->data == buf->head) + return false; + + maxlen = buf->end - buf->head; + offset = buf->data - buf->head; + + if (offset > maxlen / 2) + return true; + + if (buf->tail - buf->data < 32 && offset > maxlen / 4) + return true; + + if (buf != l->tail || ustream_can_alloc(l)) + return false; + + return (buf->end - buf->tail < len); +} + +static void ustream_free_buf(struct ustream_buf_list *l, struct ustream_buf *buf) +{ + if (buf == l->head) + l->head = buf->next; + + if (buf == l->data_tail) + l->data_tail = buf->next; + + if (buf == l->tail) + l->tail = NULL; + + if (--l->buffers >= l->min_buffers) { + free(buf); + return; + } + + /* recycle */ + ustream_init_buf(buf, buf->end - buf->head); + ustream_add_buf(l, buf); +} + +static void __ustream_set_read_blocked(struct ustream *s, unsigned char val) +{ + bool changed = !!s->read_blocked != !!val; + + s->read_blocked = val; + if (changed) + s->set_read_blocked(s); +} + +void ustream_set_read_blocked(struct ustream *s, bool set) +{ + unsigned char val = s->read_blocked & ~READ_BLOCKED_USER; + + if (set) + val |= READ_BLOCKED_USER; + + __ustream_set_read_blocked(s, val); +} + +void ustream_consume(struct ustream *s, int len) +{ + struct ustream_buf *buf = s->r.head; + + if (!len) + return; + + s->r.data_bytes -= len; + if (s->r.data_bytes < 0) + abort(); + + do { + struct ustream_buf *next = buf->next; + int buf_len = buf->tail - buf->data; + + if (len < buf_len) { + buf->data += len; + break; + } + + len -= buf_len; + ustream_free_buf(&s->r, buf); + buf = next; + } while(len); + + __ustream_set_read_blocked(s, s->read_blocked & ~READ_BLOCKED_FULL); +} + +static void ustream_fixup_string(struct ustream *s, struct ustream_buf *buf) +{ + if (!s->string_data) + return; + + *buf->tail = 0; +} + +static bool ustream_prepare_buf(struct ustream *s, struct ustream_buf_list *l, int len) +{ + struct ustream_buf *buf; + + buf = l->data_tail; + if (buf) { + if (ustream_should_move(l, buf, len)) { + int len = buf->tail - buf->data; + + memmove(buf->head, buf->data, len); + buf->data = buf->head; + buf->tail = buf->data + len; + + if (l == &s->r) + ustream_fixup_string(s, buf); + } + if (buf->tail != buf->end) + return true; + } + + if (buf && buf->next) { + l->data_tail = buf->next; + return true; + } + + if (!ustream_can_alloc(l)) + return false; + + if (l->alloc(s, l) < 0) + return false; + + l->data_tail = l->tail; + return true; +} + +char *ustream_reserve(struct ustream *s, int len, int *maxlen) +{ + struct ustream_buf *buf = s->r.head; + + if (!ustream_prepare_buf(s, &s->r, len)) { + __ustream_set_read_blocked(s, s->read_blocked | READ_BLOCKED_FULL); + *maxlen = 0; + return NULL; + } + + buf = s->r.data_tail; + *maxlen = buf->end - buf->tail; + return buf->tail; +} + +void ustream_fill_read(struct ustream *s, int len) +{ + struct ustream_buf *buf = s->r.data_tail; + int n = len; + int maxlen; + + s->r.data_bytes += len; + do { + if (!buf) + abort(); + + maxlen = buf->end - buf->tail; + if (len < maxlen) + maxlen = len; + + len -= maxlen; + buf->tail += maxlen; + ustream_fixup_string(s, buf); + + s->r.data_tail = buf; + buf = buf->next; + } while (len); + + if (s->notify_read) + s->notify_read(s, n); +} + +char *ustream_get_read_buf(struct ustream *s, int *buflen) +{ + char *data; + int len; + + if (s->r.head) { + len = s->r.head->tail - s->r.head->data; + data = s->r.head->data; + } else { + len = 0; + data = NULL; + } + + if (buflen) + *buflen = len; + + return data; +} + +static void ustream_write_error(struct ustream *s) +{ + s->write_error = true; + ustream_state_change(s); +} + +bool ustream_write_pending(struct ustream *s) +{ + struct ustream_buf *buf = s->w.head; + int wr = 0, len; + + if (s->write_error) + return false; + + while (buf) { + struct ustream_buf *next = buf->next; + int maxlen = buf->tail - buf->data; + + len = s->write(s, buf->data, maxlen, !!buf->next); + if (len < 0) { + ustream_write_error(s); + break; + } + + if (len == 0) + break; + + wr += len; + s->w.data_bytes -= len; + if (len < maxlen) { + buf->data += len; + break; + } + + ustream_free_buf(&s->w, buf); + buf = next; + } + + if (s->notify_write) + s->notify_write(s, wr); + + if (s->eof && wr && !s->w.data_bytes) + ustream_state_change(s); + + return !s->w.data_bytes; +} + +static int ustream_write_buffered(struct ustream *s, const char *data, int len, int wr) +{ + struct ustream_buf_list *l = &s->w; + struct ustream_buf *buf; + int maxlen; + + while (len) { + if (!ustream_prepare_buf(s, &s->w, len)) + break; + + buf = l->data_tail; + + maxlen = buf->end - buf->tail; + if (maxlen > len) + maxlen = len; + + memcpy(buf->tail, data, maxlen); + buf->tail += maxlen; + data += maxlen; + len -= maxlen; + wr += maxlen; + l->data_bytes += maxlen; + } + + return wr; +} + +int ustream_write(struct ustream *s, const char *data, int len, bool more) +{ + struct ustream_buf_list *l = &s->w; + int wr; + + if (s->write_error) + return 0; + + if (!l->data_bytes) { + wr = s->write(s, data, len, more); + if (wr == len) + return wr; + + if (wr < 0) { + ustream_write_error(s); + return wr; + } + + data += wr; + len -= wr; + } + + return ustream_write_buffered(s, data, len, wr); +} + +#define MAX_STACK_BUFLEN 256 + +int ustream_vprintf(struct ustream *s, const char *format, va_list arg) +{ + struct ustream_buf_list *l = &s->w; + char *buf; + va_list arg2; + int wr, maxlen, buflen; + + if (!l->data_bytes) { + buf = alloca(MAX_STACK_BUFLEN); + va_copy(arg2, arg); + maxlen = vsnprintf(buf, MAX_STACK_BUFLEN, format, arg2); + va_end(arg2); + if (maxlen < MAX_STACK_BUFLEN) { + wr = s->write(s, buf, maxlen, false); + if (wr < 0) { + ustream_write_error(s); + return wr; + } + if (wr == maxlen) + return wr; + + buf += wr; + maxlen -= wr; + return ustream_write_buffered(s, buf, maxlen, wr); + } else { + buf = malloc(maxlen + 1); + wr = vsnprintf(buf, maxlen + 1, format, arg); + wr = ustream_write(s, buf, wr, false); + free(buf); + return wr; + } + } + + if (!ustream_prepare_buf(s, l, 1)) + return 0; + + buf = l->data_tail->tail; + buflen = l->data_tail->end - buf; + + va_copy(arg2, arg); + maxlen = vsnprintf(buf, buflen, format, arg2); + va_end(arg2); + + wr = maxlen; + if (wr >= buflen) + wr = buflen - 1; + + l->data_tail->tail += wr; + l->data_bytes += wr; + if (maxlen < buflen) + return wr; + + buf = malloc(maxlen + 1); + maxlen = vsnprintf(buf, maxlen + 1, format, arg); + wr = ustream_write_buffered(s, buf + wr, maxlen - wr, wr); + free(buf); + + return wr; +} + +int ustream_printf(struct ustream *s, const char *format, ...) +{ + va_list arg; + int ret; + + va_start(arg, format); + ret = ustream_vprintf(s, format, arg); + va_end(arg); + + return ret; +} diff --git a/ustream.h b/ustream.h new file mode 100644 index 0000000..c9e23e8 --- /dev/null +++ b/ustream.h @@ -0,0 +1,183 @@ +/* + * ustream - library for stream buffer management + * + * Copyright (C) 2012 Felix Fietkau + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef __USTREAM_H +#define __USTREAM_H + +#include "uloop.h" + +struct ustream; +struct ustream_buf; + +enum read_blocked_reason { + READ_BLOCKED_USER = (1 << 0), + READ_BLOCKED_FULL = (1 << 1), +}; + +struct ustream_buf_list { + struct ustream_buf *head; + struct ustream_buf *data_tail; + struct ustream_buf *tail; + + int (*alloc)(struct ustream *s, struct ustream_buf_list *l); + + int data_bytes; + + int min_buffers; + int max_buffers; + int buffer_len; + + int buffers; +}; + +struct ustream { + struct ustream_buf_list r, w; + struct uloop_timeout state_change; + + /* + * notify_read: + * called by the ustream core to notify that new data is available + * for reading. + * must not free the ustream from this callback + */ + void (*notify_read)(struct ustream *s, int bytes_new); + + /* + * notify_write: (optional) + * called by the ustream core to notify that some buffered data has + * been written to the stream. + * must not free the ustream from this callback + */ + void (*notify_write)(struct ustream *s, int bytes); + + /* + * notify_state: + * called by the ustream implementation to notify that the read + * side of the stream is closed (eof is set) or there was a write + * error (write_error is set). + * will be called again after the write buffer has been emptied when + * the read side has hit EOF. + */ + void (*notify_state)(struct ustream *s); + + /* + * write: + * must be defined by ustream implementation, accepts new write data. + * 'more' is used to indicate that a subsequent call will provide more + * data (useful for aggregating writes) + * returns the number of bytes accepted, or -1 if no more writes can + * be accepted (link error) + */ + int (*write)(struct ustream *s, const char *buf, int len, bool more); + + /* + * free: (optional) + * defined by ustream implementation, tears down the ustream and frees data + */ + void (*free)(struct ustream *s); + + /* + * set_read_blocked: (optional) + * defined by ustream implementation, called when the read_blocked flag + * changes + */ + void (*set_read_blocked)(struct ustream *s); + + /* + * ustream user should set this if the input stream is expected + * to contain string data. the core will keep all data 0-terminated. + */ + bool string_data; + bool write_error; + bool eof, eof_write_done; + + enum read_blocked_reason read_blocked; +}; + +struct ustream_fd { + struct ustream stream; + struct uloop_fd fd; +}; + +struct ustream_buf { + struct ustream_buf *next; + + char *data; + char *tail; + char *end; + + char head[]; +}; + +/* ustream_fd_init: create a file descriptor ustream (uses uloop) */ +void ustream_fd_init(struct ustream_fd *s, int fd); + +/* ustream_free: free all buffers and data associated with a ustream */ +void ustream_free(struct ustream *s); + +/* ustream_consume: remove data from the head of the read buffer */ +void ustream_consume(struct ustream *s, int len); + +/* ustream_write: add data to the write buffer */ +int ustream_write(struct ustream *s, const char *buf, int len, bool more); +int ustream_printf(struct ustream *s, const char *format, ...); +int ustream_vprintf(struct ustream *s, const char *format, va_list arg); + +/* ustream_get_read_buf: get a pointer to the next read buffer data */ +char *ustream_get_read_buf(struct ustream *s, int *buflen); + +/* + * ustream_set_read_blocked: set read blocked state + * + * if set, the ustream will no longer fetch pending data. + */ +void ustream_set_read_blocked(struct ustream *s, bool set); + +static inline bool ustream_read_blocked(struct ustream *s) +{ + return !!(s->read_blocked & READ_BLOCKED_USER); +} + +/*** --- functions only used by ustream implementations --- ***/ + +/* ustream_init_defaults: fill default callbacks and options */ +void ustream_init_defaults(struct ustream *s); + +/* + * ustream_reserve: allocate rx buffer space + * + * len: hint for how much space is needed (not guaranteed to be met) + * maxlen: pointer to where the actual buffer size is going to be stored + */ +char *ustream_reserve(struct ustream *s, int len, int *maxlen); + +/* ustream_fill_read: mark rx buffer space as filled */ +void ustream_fill_read(struct ustream *s, int len); + +/* + * ustream_write_pending: attempt to write more data from write buffers + * returns true if all write buffers have been emptied. + */ +bool ustream_write_pending(struct ustream *s); + +static inline void ustream_state_change(struct ustream *s) +{ + uloop_timeout_set(&s->state_change, 0); +} + +#endif -- 2.11.0