X-Git-Url: http://git.archive.openwrt.org/?a=blobdiff_plain;f=libubus.c;h=5488f3b8dcc6c1f05ed0250aa3bbe2c19ec18e0c;hb=0371c8f5fde821f06948116383f0073f8b7f67ba;hp=5c6bd8a3cc196ac1481af896b31cd0e79c9101b6;hpb=24505f172a36050f79bea275abc5a3945183f3e9;p=project%2Fubus.git diff --git a/libubus.c b/libubus.c index 5c6bd8a..5488f3b 100644 --- a/libubus.c +++ b/libubus.c @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -36,6 +37,8 @@ const char *__ubus_strerror[__UBUS_STATUS_LAST] = { [UBUS_STATUS_PERMISSION_DENIED] = "Permission denied", [UBUS_STATUS_TIMEOUT] = "Request timed out", [UBUS_STATUS_NOT_SUPPORTED] = "Operation not supported", + [UBUS_STATUS_UNKNOWN_ERROR] = "Unknown error", + [UBUS_STATUS_CONNECTION_FAILED] = "Connection failed", }; static struct blob_buf b; @@ -54,6 +57,13 @@ struct ubus_pending_data { struct blob_attr data[]; }; +struct ubus_pending_invoke { + struct list_head list; + struct ubus_msghdr hdr; +}; + +static void ubus_process_pending_invoke(struct ubus_context *ctx); + static int ubus_cmp_id(const void *k1, const void *k2, void *ptr) { const uint32_t *id1 = k1, *id2 = k2; @@ -87,6 +97,14 @@ out: return err; } +static void wait_data(int fd, bool write) +{ + struct pollfd pfd = { .fd = fd }; + + pfd.events = write ? POLLOUT : POLLIN; + poll(&pfd, 1, 0); +} + static int writev_retry(int fd, struct iovec *iov, int iov_len) { int len = 0; @@ -96,9 +114,7 @@ static int writev_retry(int fd, struct iovec *iov, int iov_len) if (cur_len < 0) { switch(errno) { case EAGAIN: - /* turn off non-blocking mode */ - fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & - ~O_NONBLOCK); + wait_data(fd, true); break; case EINTR: break; @@ -164,6 +180,9 @@ static bool recv_retry(int fd, struct iovec *iov, bool wait) int bytes; while (iov->iov_len > 0) { + if (wait) + wait_data(fd, false); + bytes = read(fd, iov->iov_base, iov->iov_len); if (bytes < 0) { bytes = 0; @@ -271,7 +290,7 @@ static int ubus_process_req_status(struct ubus_request *req, struct ubus_msghdr int ret = UBUS_STATUS_INVALID_ARGUMENT; if (!list_empty(&req->list)) - list_del(&req->list); + list_del_init(&req->list); ubus_get_status(hdr, &ret); req->peer = hdr->peer; @@ -331,6 +350,8 @@ static void ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hd int method; int ret = 0; + req.peer = hdr->peer; + req.seq = hdr->seq; ubus_parse_msg(hdr->data); if (!attrbuf[UBUS_ATTR_OBJID]) @@ -361,8 +382,6 @@ static void ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hd found: req.object = objid; - req.peer = hdr->peer; - req.seq = hdr->seq; ret = obj->methods[method].handler(ctx, obj, &req, blob_data(attrbuf[UBUS_ATTR_METHOD]), attrbuf[UBUS_ATTR_DATA]); @@ -371,11 +390,12 @@ send: blob_buf_init(&b, 0); blob_put_int32(&b, UBUS_ATTR_STATUS, ret); blob_put_int32(&b, UBUS_ATTR_OBJID, objid); - ubus_send_msg(ctx, hdr->seq, b.head, UBUS_MSG_STATUS, hdr->peer); + ubus_send_msg(ctx, req.seq, b.head, UBUS_MSG_STATUS, req.peer); } static void ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr) { + struct ubus_pending_invoke *pending; struct ubus_request *req; switch(hdr->type) { @@ -394,11 +414,36 @@ static void ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr) break; case UBUS_MSG_INVOKE: - ubus_process_invoke(ctx, hdr); + if (ctx->stack_depth > 2) { + pending = calloc(1, sizeof(*pending) + + blob_raw_len(hdr->data)); + + if (!pending) + return; + + memcpy(&pending->hdr, hdr, sizeof(*hdr) + + blob_raw_len(hdr->data)); + list_add(&pending->list, &ctx->pending); + } else { + ubus_process_invoke(ctx, hdr); + } break; } } +static void ubus_process_pending_invoke(struct ubus_context *ctx) +{ + struct ubus_pending_invoke *pending, *tmp; + + list_for_each_entry_safe(pending, tmp, &ctx->pending, list) { + list_del(&pending->list); + ubus_process_msg(ctx, &pending->hdr); + free(pending); + if (ctx->stack_depth > 2) + break; + } +} + void ubus_abort_request(struct ubus_context *ctx, struct ubus_request *req) { if (!list_empty(&req->list)) @@ -406,7 +451,7 @@ void ubus_abort_request(struct ubus_context *ctx, struct ubus_request *req) req->cancelled = true; ubus_process_req_data(req); - list_del(&req->list); + list_del_init(&req->list); } void ubus_complete_request_async(struct ubus_context *ctx, struct ubus_request *req) @@ -458,7 +503,6 @@ int ubus_complete_request(struct ubus_context *ctx, struct ubus_request *req, struct ubus_sync_req_cb cb; ubus_complete_handler_t complete_cb = req->complete_cb; bool registered = ctx->sock.registered; - bool cancelled = uloop_cancelled; int status = UBUS_STATUS_NO_DATA; if (!registered) { @@ -476,7 +520,14 @@ int ubus_complete_request(struct ubus_context *ctx, struct ubus_request *req, ubus_complete_request_async(ctx, req); req->complete_cb = ubus_sync_req_cb; - uloop_run(); + ctx->stack_depth++; + while (!req->status_msg) { + bool cancelled = uloop_cancelled; + uloop_cancelled = false; + uloop_run(); + uloop_cancelled = cancelled; + } + ctx->stack_depth--; if (timeout) uloop_timeout_cancel(&cb.timeout); @@ -488,10 +539,12 @@ int ubus_complete_request(struct ubus_context *ctx, struct ubus_request *req, if (req->complete_cb) req->complete_cb(req, status); - uloop_cancelled = cancelled; if (!registered) uloop_fd_delete(&ctx->sock); + if (!ctx->stack_depth) + ubus_process_pending_invoke(ctx); + return status; } @@ -657,7 +710,7 @@ static bool ubus_push_object_type(const struct ubus_object_type *type) return true; } -static int __ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) +int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) { struct ubus_request req; int ret; @@ -688,14 +741,6 @@ static int __ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) return 0; } -int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) -{ - if (!obj->name || !obj->type) - return UBUS_STATUS_INVALID_ARGUMENT; - - return __ubus_add_object(ctx, obj); -} - static void ubus_remove_object_cb(struct ubus_request *req, int type, struct blob_attr *msg) { struct ubus_object *obj = req->priv; @@ -767,7 +812,7 @@ int ubus_register_event_handler(struct ubus_context *ctx, if (!!obj->name ^ !!obj->type) return UBUS_STATUS_INVALID_ARGUMENT; - ret = __ubus_add_object(ctx, obj); + ret = ubus_add_object(ctx, obj); if (ret) return ret; } @@ -783,6 +828,81 @@ int ubus_register_event_handler(struct ubus_context *ctx, NULL, NULL, 0); } +enum { + WATCH_ID, + WATCH_NOTIFY, + __WATCH_MAX +}; + +static const struct blobmsg_policy watch_policy[] = { + [WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 }, + [WATCH_NOTIFY] = { .name = "notify", .type = BLOBMSG_TYPE_STRING }, +}; + + +static int ubus_watch_cb(struct ubus_context *ctx, struct ubus_object *obj, + struct ubus_request_data *req, + const char *method, struct blob_attr *msg) +{ + struct ubus_watch_object *w; + struct blob_attr *tb[__WATCH_MAX]; + + blobmsg_parse(watch_policy, ARRAY_SIZE(watch_policy), tb, blob_data(msg), blob_len(msg)); + + if (!tb[WATCH_ID] || !tb[WATCH_NOTIFY]) + return UBUS_STATUS_INVALID_ARGUMENT; + + if (req->peer) + return UBUS_STATUS_INVALID_ARGUMENT; + + w = container_of(obj, struct ubus_watch_object, obj); + w->cb(ctx, w, blobmsg_get_u32(tb[WATCH_ID])); + return 0; +} + +static const struct ubus_method watch_method = { + .name = NULL, + .handler = ubus_watch_cb, +}; + +int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *w_obj) +{ + struct ubus_object *obj = &w_obj->obj; + + obj->methods = &watch_method; + obj->n_methods = 1; + + return ubus_add_object(ctx, obj); +} + +static int +__ubus_watch_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type) +{ + struct ubus_request req; + + blob_buf_init(&b, 0); + blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id); + blob_put_int32(&b, UBUS_ATTR_TARGET, id); + if (method) + blob_put_string(&b, UBUS_ATTR_METHOD, method); + + if (ubus_start_request(ctx, &req, b.head, type, 0) < 0) + return UBUS_STATUS_INVALID_ARGUMENT; + + return ubus_complete_request(ctx, &req, 0); + +} + +int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id) +{ + return __ubus_watch_request(ctx, &obj->obj, id, "event", UBUS_MSG_ADD_WATCH); +} + +int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id) +{ + return __ubus_watch_request(ctx, &obj->obj, id, NULL, UBUS_MSG_REMOVE_WATCH); +} + int ubus_send_event(struct ubus_context *ctx, const char *id, struct blob_attr *data) { @@ -803,72 +923,108 @@ int ubus_send_event(struct ubus_context *ctx, const char *id, return ubus_complete_request(ctx, &req, 0); } -static void ubus_default_connection_lost(struct ubus_context *ctx) +static void +ubus_refresh_state(struct ubus_context *ctx) { - if (ctx->sock.registered) - uloop_end(); + struct ubus_object *obj, *tmp; + + /* clear all type IDs, they need to be registered again */ + avl_for_each_element(&ctx->objects, obj, avl) + obj->type->id = 0; + + /* push out all objects again */ + avl_for_each_element_safe(&ctx->objects, obj, avl, tmp) { + obj->id = 0; + avl_delete(&ctx->objects, &obj->avl); + ubus_add_object(ctx, obj); + } } -struct ubus_context *ubus_connect(const char *path) +int ubus_reconnect(struct ubus_context *ctx, const char *path) { - struct ubus_context *ctx; struct { struct ubus_msghdr hdr; struct blob_attr data; } hdr; struct blob_attr *buf; + int ret = UBUS_STATUS_UNKNOWN_ERROR; if (!path) path = UBUS_UNIX_SOCKET; - ctx = calloc(1, sizeof(*ctx)); - if (!ctx) - goto error; + if (ctx->sock.fd >= 0) { + if (ctx->sock.registered) + uloop_fd_delete(&ctx->sock); + + close(ctx->sock.fd); + } ctx->sock.fd = usock(USOCK_UNIX, path, NULL); if (ctx->sock.fd < 0) - goto error_free; - - ctx->sock.cb = ubus_handle_data; + return UBUS_STATUS_CONNECTION_FAILED; if (read(ctx->sock.fd, &hdr, sizeof(hdr)) != sizeof(hdr)) - goto error_close; + goto out_close; if (!ubus_validate_hdr(&hdr.hdr)) - goto error_close; + goto out_close; if (hdr.hdr.type != UBUS_MSG_HELLO) - goto error_close; + goto out_close; buf = calloc(1, blob_raw_len(&hdr.data)); if (!buf) - goto error_close; + goto out_close; memcpy(buf, &hdr.data, sizeof(hdr.data)); if (read(ctx->sock.fd, blob_data(buf), blob_len(buf)) != blob_len(buf)) - goto error_free_buf; + goto out_free; ctx->local_id = hdr.hdr.peer; + if (!ctx->local_id) + goto out_free; + + ret = UBUS_STATUS_OK; + fcntl(ctx->sock.fd, F_SETFL, fcntl(ctx->sock.fd, F_GETFL) | O_NONBLOCK); + + ubus_refresh_state(ctx); + +out_free: free(buf); +out_close: + if (ret) + close(ctx->sock.fd); + return ret; +} + +static void ubus_default_connection_lost(struct ubus_context *ctx) +{ + if (ctx->sock.registered) + uloop_end(); +} + +struct ubus_context *ubus_connect(const char *path) +{ + struct ubus_context *ctx; + + ctx = calloc(1, sizeof(*ctx)); + if (!ctx) + return NULL; + + ctx->sock.fd = -1; + ctx->sock.cb = ubus_handle_data; ctx->connection_lost = ubus_default_connection_lost; INIT_LIST_HEAD(&ctx->requests); + INIT_LIST_HEAD(&ctx->pending); avl_init(&ctx->objects, ubus_cmp_id, false, NULL); - - if (!ctx->local_id) - goto error_close; + if (ubus_reconnect(ctx, path)) { + free(ctx); + ctx = NULL; + } return ctx; - -error_free_buf: - free(buf); -error_close: - close(ctx->sock.fd); -error_free: - free(ctx); -error: - return NULL; } void ubus_free(struct ubus_context *ctx)