From a95079e0e71e2977db667e2cbf61a02f05f32882 Mon Sep 17 00:00:00 2001 From: Jo-Philipp Wich Date: Sun, 1 Sep 2013 19:58:10 +0200 Subject: [PATCH] Extend rpc_exec() to allow feeding childs stdin via callbacks, allow finish callback to override the ubus return code --- exec.c | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------- exec.h | 34 ++++++++++++++++++++++++--------- 2 files changed, 82 insertions(+), 19 deletions(-) diff --git a/exec.c b/exec.c index 5ce7f15..944ade4 100644 --- a/exec.c +++ b/exec.c @@ -133,9 +133,10 @@ rpc_exec_reply(struct rpc_exec_context *c, int rv) } if (c->finish_cb) - c->finish_cb(&c->blob, c->stat, c->priv); + rv = c->finish_cb(&c->blob, c->stat, c->priv); - ubus_send_reply(c->context, &c->request, c->blob.head); + if (rv == UBUS_STATUS_OK) + ubus_send_reply(c->context, &c->request, c->blob.head); } ubus_complete_deferred_request(c->context, &c->request, rv); @@ -155,7 +156,7 @@ rpc_exec_reply(struct rpc_exec_context *c, int rv) } static void -rpc_exec_timestdout_cb(struct uloop_timeout *t) +rpc_exec_timeout_cb(struct uloop_timeout *t) { struct rpc_exec_context *c = container_of(t, struct rpc_exec_context, timeout); @@ -177,6 +178,19 @@ rpc_exec_process_cb(struct uloop_process *p, int stat) } static void +rpc_exec_ipipe_write_cb(struct ustream *s, int bytes) +{ + struct rpc_exec_context *c = + container_of(s, struct rpc_exec_context, ipipe.stream); + + if (c->stdin_cb(s, c->priv) <= 0) + { + ustream_free(&c->ipipe.stream); + close(c->ipipe.fd.fd); + } +} + +static void rpc_exec_opipe_read_cb(struct ustream *s, int bytes) { int len, rv; @@ -257,12 +271,14 @@ rpc_exec_epipe_state_cb(struct ustream *s) } int -rpc_exec(const char **args, rpc_exec_read_cb_t out, rpc_exec_read_cb_t err, +rpc_exec(const char **args, rpc_exec_write_cb_t in, + rpc_exec_read_cb_t out, rpc_exec_read_cb_t err, rpc_exec_done_cb_t end, void *priv, struct ubus_context *ctx, struct ubus_request_data *req) { pid_t pid; + int ipipe[2]; int opipe[2]; int epipe[2]; @@ -279,8 +295,14 @@ rpc_exec(const char **args, rpc_exec_read_cb_t out, rpc_exec_read_cb_t err, if (!c) return UBUS_STATUS_UNKNOWN_ERROR; - if (pipe(opipe) || pipe(epipe)) - return rpc_errno_status(); + if (pipe(ipipe)) + goto fail_ipipe; + + if (pipe(opipe)) + goto fail_opipe; + + if (pipe(epipe)) + goto fail_epipe; switch ((pid = fork())) { @@ -290,10 +312,12 @@ rpc_exec(const char **args, rpc_exec_read_cb_t out, rpc_exec_read_cb_t err, case 0: uloop_done(); + dup2(ipipe[0], 0); dup2(opipe[1], 1); dup2(epipe[1], 2); - close(0); + close(ipipe[0]); + close(ipipe[1]); close(opipe[0]); close(opipe[1]); close(epipe[0]); @@ -306,21 +330,33 @@ rpc_exec(const char **args, rpc_exec_read_cb_t out, rpc_exec_read_cb_t err, memset(c, 0, sizeof(*c)); blob_buf_init(&c->blob, 0); + c->stdin_cb = in; c->stdout_cb = out; c->stderr_cb = err; c->finish_cb = end; c->priv = priv; - ustream_declare(c->opipe, opipe[0], opipe); - ustream_declare(c->epipe, epipe[0], epipe); + ustream_declare_read(c->opipe, opipe[0], opipe); + ustream_declare_read(c->epipe, epipe[0], epipe); c->process.pid = pid; c->process.cb = rpc_exec_process_cb; uloop_process_add(&c->process); - c->timeout.cb = rpc_exec_timestdout_cb; + c->timeout.cb = rpc_exec_timeout_cb; uloop_timeout_set(&c->timeout, RPC_EXEC_MAX_RUNTIME); + if (c->stdin_cb) + { + ustream_declare_write(c->ipipe, ipipe[1], ipipe); + rpc_exec_ipipe_write_cb(&c->ipipe.stream, 0); + } + else + { + close(ipipe[1]); + } + + close(ipipe[0]); close(opipe[1]); close(epipe[1]); @@ -329,4 +365,15 @@ rpc_exec(const char **args, rpc_exec_read_cb_t out, rpc_exec_read_cb_t err, } return UBUS_STATUS_OK; + +fail_epipe: + close(opipe[0]); + close(opipe[1]); + +fail_opipe: + close(ipipe[0]); + close(ipipe[1]); + +fail_ipipe: + return rpc_errno_status(); } diff --git a/exec.h b/exec.h index c61e5e3..8642c0b 100644 --- a/exec.h +++ b/exec.h @@ -31,22 +31,36 @@ ptr != NULL && len > 0; \ ustream_consume(stream, len), ptr = ustream_get_read_buf(stream, &len)) -#define ustream_declare(us, fd, name) \ - us.stream.string_data = true; \ - us.stream.r.buffer_len = 4096; \ - us.stream.r.max_buffers = RPC_EXEC_MAX_SIZE / 4096; \ - us.stream.notify_read = rpc_exec_##name##_read_cb; \ - us.stream.notify_state = rpc_exec_##name##_state_cb; \ - ustream_fd_init(&us, fd); +#define ustream_declare_read(us, fd, name) \ + do { \ + us.stream.string_data = true; \ + us.stream.r.buffer_len = 4096; \ + us.stream.r.max_buffers = RPC_EXEC_MAX_SIZE / 4096; \ + us.stream.notify_read = rpc_exec_##name##_read_cb; \ + us.stream.notify_state = rpc_exec_##name##_state_cb; \ + ustream_fd_init(&us, fd); \ + } while(0) +#define ustream_declare_write(us, fd, name) \ + do { \ + us.stream.string_data = true; \ + us.stream.w.buffer_len = 4096; \ + us.stream.w.max_buffers = RPC_EXEC_MAX_SIZE / 4096; \ + us.stream.notify_write = rpc_exec_##name##_write_cb; \ + ustream_fd_init(&us, fd); \ + } while(0) + + +typedef int (*rpc_exec_write_cb_t)(struct ustream *, void *); typedef int (*rpc_exec_read_cb_t)(struct blob_buf *, char *, int, void *); -typedef void (*rpc_exec_done_cb_t)(struct blob_buf *, int, void *); +typedef int (*rpc_exec_done_cb_t)(struct blob_buf *, int, void *); struct rpc_exec_context { struct ubus_context *context; struct ubus_request_data request; struct uloop_timeout timeout; struct uloop_process process; + struct ustream_fd ipipe; struct ustream_fd opipe; struct ustream_fd epipe; int outlen; @@ -58,12 +72,14 @@ struct rpc_exec_context { bool blob_array; void *blob_cookie; struct blob_buf blob; + rpc_exec_write_cb_t stdin_cb; rpc_exec_read_cb_t stdout_cb; rpc_exec_read_cb_t stderr_cb; rpc_exec_done_cb_t finish_cb; }; -int rpc_exec(const char **args, rpc_exec_read_cb_t out, rpc_exec_read_cb_t err, +int rpc_exec(const char **args, rpc_exec_write_cb_t in, + rpc_exec_read_cb_t out, rpc_exec_read_cb_t err, rpc_exec_done_cb_t end, void *priv, struct ubus_context *ctx, struct ubus_request_data *req); -- 2.11.0