X-Git-Url: http://git.archive.openwrt.org/?p=project%2Fubus.git;a=blobdiff_plain;f=libubus.c;h=590a0fa2ec5dbdf6e008545c0b19bc4f421e640d;hp=aed615d184c666b4d96801e4eed3fda387a8997b;hb=c6f705451527e6e5e8f5a2715f11478eeb8799e4;hpb=cc82d8999515b87d7e4d8ff78f187f7451a5ced5 diff --git a/libubus.c b/libubus.c index aed615d..590a0fa 100644 --- a/libubus.c +++ b/libubus.c @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -55,6 +56,13 @@ struct ubus_pending_data { struct blob_attr data[]; }; +struct ubus_pending_invoke { + struct list_head list; + struct ubus_msghdr hdr; +}; + +static void ubus_process_pending_invoke(struct ubus_context *ctx); + static int ubus_cmp_id(const void *k1, const void *k2, void *ptr) { const uint32_t *id1 = k1, *id2 = k2; @@ -88,6 +96,14 @@ out: return err; } +static void wait_data(int fd, bool write) +{ + struct pollfd pfd = { .fd = fd }; + + pfd.events = write ? POLLOUT : POLLIN; + poll(&pfd, 1, 0); +} + static int writev_retry(int fd, struct iovec *iov, int iov_len) { int len = 0; @@ -97,9 +113,7 @@ static int writev_retry(int fd, struct iovec *iov, int iov_len) if (cur_len < 0) { switch(errno) { case EAGAIN: - /* turn off non-blocking mode */ - fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & - ~O_NONBLOCK); + wait_data(fd, true); break; case EINTR: break; @@ -165,6 +179,9 @@ static bool recv_retry(int fd, struct iovec *iov, bool wait) int bytes; while (iov->iov_len > 0) { + if (wait) + wait_data(fd, false); + bytes = read(fd, iov->iov_base, iov->iov_len); if (bytes < 0) { bytes = 0; @@ -272,7 +289,7 @@ static int ubus_process_req_status(struct ubus_request *req, struct ubus_msghdr int ret = UBUS_STATUS_INVALID_ARGUMENT; if (!list_empty(&req->list)) - list_del(&req->list); + list_del_init(&req->list); ubus_get_status(hdr, &ret); req->peer = hdr->peer; @@ -332,6 +349,8 @@ static void ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hd int method; int ret = 0; + req.peer = hdr->peer; + req.seq = hdr->seq; ubus_parse_msg(hdr->data); if (!attrbuf[UBUS_ATTR_OBJID]) @@ -362,8 +381,6 @@ static void ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hd found: req.object = objid; - req.peer = hdr->peer; - req.seq = hdr->seq; ret = obj->methods[method].handler(ctx, obj, &req, blob_data(attrbuf[UBUS_ATTR_METHOD]), attrbuf[UBUS_ATTR_DATA]); @@ -372,11 +389,12 @@ send: blob_buf_init(&b, 0); blob_put_int32(&b, UBUS_ATTR_STATUS, ret); blob_put_int32(&b, UBUS_ATTR_OBJID, objid); - ubus_send_msg(ctx, hdr->seq, b.head, UBUS_MSG_STATUS, hdr->peer); + ubus_send_msg(ctx, req.seq, b.head, UBUS_MSG_STATUS, req.peer); } static void ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr) { + struct ubus_pending_invoke *pending; struct ubus_request *req; switch(hdr->type) { @@ -395,11 +413,36 @@ static void ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr) break; case UBUS_MSG_INVOKE: - ubus_process_invoke(ctx, hdr); + if (ctx->stack_depth > 2) { + pending = calloc(1, sizeof(*pending) + + blob_raw_len(hdr->data)); + + if (!pending) + return; + + memcpy(&pending->hdr, hdr, sizeof(*hdr) + + blob_raw_len(hdr->data)); + list_add(&pending->list, &ctx->pending); + } else { + ubus_process_invoke(ctx, hdr); + } break; } } +static void ubus_process_pending_invoke(struct ubus_context *ctx) +{ + struct ubus_pending_invoke *pending, *tmp; + + list_for_each_entry_safe(pending, tmp, &ctx->pending, list) { + list_del(&pending->list); + ubus_process_msg(ctx, &pending->hdr); + free(pending); + if (ctx->stack_depth > 2) + break; + } +} + void ubus_abort_request(struct ubus_context *ctx, struct ubus_request *req) { if (!list_empty(&req->list)) @@ -407,7 +450,7 @@ void ubus_abort_request(struct ubus_context *ctx, struct ubus_request *req) req->cancelled = true; ubus_process_req_data(req); - list_del(&req->list); + list_del_init(&req->list); } void ubus_complete_request_async(struct ubus_context *ctx, struct ubus_request *req) @@ -459,7 +502,6 @@ int ubus_complete_request(struct ubus_context *ctx, struct ubus_request *req, struct ubus_sync_req_cb cb; ubus_complete_handler_t complete_cb = req->complete_cb; bool registered = ctx->sock.registered; - bool cancelled = uloop_cancelled; int status = UBUS_STATUS_NO_DATA; if (!registered) { @@ -477,7 +519,14 @@ int ubus_complete_request(struct ubus_context *ctx, struct ubus_request *req, ubus_complete_request_async(ctx, req); req->complete_cb = ubus_sync_req_cb; - uloop_run(); + ctx->stack_depth++; + while (!req->status_msg) { + bool cancelled = uloop_cancelled; + uloop_cancelled = false; + uloop_run(); + uloop_cancelled = cancelled; + } + ctx->stack_depth--; if (timeout) uloop_timeout_cancel(&cb.timeout); @@ -489,10 +538,12 @@ int ubus_complete_request(struct ubus_context *ctx, struct ubus_request *req, if (req->complete_cb) req->complete_cb(req, status); - uloop_cancelled = cancelled; if (!registered) uloop_fd_delete(&ctx->sock); + if (!ctx->stack_depth) + ubus_process_pending_invoke(ctx); + return status; } @@ -658,7 +709,7 @@ static bool ubus_push_object_type(const struct ubus_object_type *type) return true; } -static int __ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) +int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) { struct ubus_request req; int ret; @@ -689,14 +740,6 @@ static int __ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) return 0; } -int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) -{ - if (!obj->name || !obj->type) - return UBUS_STATUS_INVALID_ARGUMENT; - - return __ubus_add_object(ctx, obj); -} - static void ubus_remove_object_cb(struct ubus_request *req, int type, struct blob_attr *msg) { struct ubus_object *obj = req->priv; @@ -768,7 +811,7 @@ int ubus_register_event_handler(struct ubus_context *ctx, if (!!obj->name ^ !!obj->type) return UBUS_STATUS_INVALID_ARGUMENT; - ret = __ubus_add_object(ctx, obj); + ret = ubus_add_object(ctx, obj); if (ret) return ret; } @@ -784,6 +827,81 @@ int ubus_register_event_handler(struct ubus_context *ctx, NULL, NULL, 0); } +enum { + WATCH_ID, + WATCH_NOTIFY, + __WATCH_MAX +}; + +static const struct blobmsg_policy watch_policy[] = { + [WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 }, + [WATCH_NOTIFY] = { .name = "notify", .type = BLOBMSG_TYPE_STRING }, +}; + + +static int ubus_watch_cb(struct ubus_context *ctx, struct ubus_object *obj, + struct ubus_request_data *req, + const char *method, struct blob_attr *msg) +{ + struct ubus_watch_object *w; + struct blob_attr *tb[__WATCH_MAX]; + + blobmsg_parse(watch_policy, ARRAY_SIZE(watch_policy), tb, blob_data(msg), blob_len(msg)); + + if (!tb[WATCH_ID] || !tb[WATCH_NOTIFY]) + return UBUS_STATUS_INVALID_ARGUMENT; + + if (req->peer) + return UBUS_STATUS_INVALID_ARGUMENT; + + w = container_of(obj, struct ubus_watch_object, obj); + w->cb(ctx, w, blobmsg_get_u32(tb[WATCH_ID])); + return 0; +} + +static const struct ubus_method watch_method = { + .name = NULL, + .handler = ubus_watch_cb, +}; + +int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *w_obj) +{ + struct ubus_object *obj = &w_obj->obj; + + obj->methods = &watch_method; + obj->n_methods = 1; + + return ubus_add_object(ctx, obj); +} + +static int +__ubus_watch_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type) +{ + struct ubus_request req; + + blob_buf_init(&b, 0); + blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id); + blob_put_int32(&b, UBUS_ATTR_TARGET, id); + if (method) + blob_put_string(&b, UBUS_ATTR_METHOD, method); + + if (ubus_start_request(ctx, &req, b.head, type, 0) < 0) + return UBUS_STATUS_INVALID_ARGUMENT; + + return ubus_complete_request(ctx, &req, 0); + +} + +int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id) +{ + return __ubus_watch_request(ctx, &obj->obj, id, "event", UBUS_MSG_ADD_WATCH); +} + +int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id) +{ + return __ubus_watch_request(ctx, &obj->obj, id, NULL, UBUS_MSG_REMOVE_WATCH); +} + int ubus_send_event(struct ubus_context *ctx, const char *id, struct blob_attr *data) { @@ -855,11 +973,14 @@ struct ubus_context *ubus_connect(const char *path) ctx->connection_lost = ubus_default_connection_lost; INIT_LIST_HEAD(&ctx->requests); + INIT_LIST_HEAD(&ctx->pending); avl_init(&ctx->objects, ubus_cmp_id, false, NULL); if (!ctx->local_id) goto error_close; + fcntl(ctx->sock.fd, F_SETFL, fcntl(ctx->sock.fd, F_GETFL) | O_NONBLOCK); + return ctx; error_free_buf: