X-Git-Url: http://git.archive.openwrt.org/?p=project%2Fubus.git;a=blobdiff_plain;f=libubus.c;h=d52faff9d9cd11b573584268df21f776b6fbe379;hp=bcaf63dc377f4a158a1cda98b70997f69e077ff4;hb=619f3a160de4f417226b69039538882787b3811c;hpb=a6f52f058e52c7720afb05e5e4e51648e636b940;ds=sidebyside diff --git a/libubus.c b/libubus.c index bcaf63d..d52faff 100644 --- a/libubus.c +++ b/libubus.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011-2012 Felix Fietkau + * Copyright (C) 2011-2014 Felix Fietkau * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License version 2.1 @@ -38,19 +38,11 @@ const char *__ubus_strerror[__UBUS_STATUS_LAST] = { struct blob_buf b __hidden = {}; -struct ubus_pending_data { +struct ubus_pending_msg { struct list_head list; - int type; - struct blob_attr data[]; + struct ubus_msghdr_buf hdr; }; -struct ubus_pending_invoke { - struct list_head list; - struct ubus_msghdr hdr; -}; - -static void ubus_process_pending_invoke(struct ubus_context *ctx); - static int ubus_cmp_id(const void *k1, const void *k2, void *ptr) { const uint32_t *id1 = k1, *id2 = k2; @@ -78,259 +70,59 @@ out: return err; } -static bool ubus_get_status(struct ubus_msghdr *hdr, int *ret) -{ - struct blob_attr **attrbuf = ubus_parse_msg(hdr->data); - - if (!attrbuf[UBUS_ATTR_STATUS]) - return false; - - *ret = blob_get_u32(attrbuf[UBUS_ATTR_STATUS]); - return true; -} - -static void req_data_cb(struct ubus_request *req, int type, struct blob_attr *data) -{ - struct blob_attr **attr; - - if (req->raw_data_cb) - req->raw_data_cb(req, type, data); - - if (!req->data_cb) - return; - - attr = ubus_parse_msg(data); - req->data_cb(req, type, attr[UBUS_ATTR_DATA]); -} - -static void ubus_process_req_data(struct ubus_request *req) -{ - struct ubus_pending_data *data; - - while (!list_empty(&req->pending)) { - data = list_first_entry(&req->pending, - struct ubus_pending_data, list); - list_del(&data->list); - if (!req->cancelled) - req_data_cb(req, data->type, data->data); - free(data); - } -} - -static void ubus_req_complete_cb(struct ubus_request *req) -{ - ubus_complete_handler_t cb = req->complete_cb; - - if (!cb) - return; - - req->complete_cb = NULL; - cb(req, req->status_code); -} - -static int ubus_process_req_status(struct ubus_request *req, struct ubus_msghdr *hdr) +static void +ubus_queue_msg(struct ubus_context *ctx, struct ubus_msghdr_buf *buf) { - int ret = UBUS_STATUS_INVALID_ARGUMENT; + struct ubus_pending_msg *pending; + void *data; - if (!list_empty(&req->list)) - list_del_init(&req->list); + pending = calloc_a(sizeof(*pending), &data, blob_raw_len(buf->data)); - ubus_get_status(hdr, &ret); - req->peer = hdr->peer; - req->status_msg = true; - req->status_code = ret; - if (!req->blocked) - ubus_req_complete_cb(req); - - return ret; -} - -static void ubus_req_data(struct ubus_request *req, struct ubus_msghdr *hdr) -{ - struct ubus_pending_data *data; - int len; - - if (!req->blocked) { - req->blocked = true; - req_data_cb(req, hdr->type, hdr->data); - ubus_process_req_data(req); - req->blocked = false; - - if (req->status_msg) - ubus_req_complete_cb(req); - - return; - } - - len = blob_raw_len(hdr->data); - data = calloc(1, sizeof(*data) + len); - if (!data) - return; - - data->type = hdr->type; - memcpy(data->data, hdr->data, len); - list_add(&data->list, &req->pending); -} - -static struct ubus_request *ubus_find_request(struct ubus_context *ctx, uint32_t seq, uint32_t peer) -{ - struct ubus_request *req; - - list_for_each_entry(req, &ctx->requests, list) { - if (seq != req->seq || peer != req->peer) - continue; - - return req; - } - return NULL; -} - -void ubus_complete_deferred_request(struct ubus_context *ctx, struct ubus_request_data *req, int ret) -{ - blob_buf_init(&b, 0); - blob_put_int32(&b, UBUS_ATTR_STATUS, ret); - blob_put_int32(&b, UBUS_ATTR_OBJID, req->object); - ubus_send_msg(ctx, req->seq, b.head, UBUS_MSG_STATUS, req->peer); + pending->hdr.data = data; + memcpy(&pending->hdr.hdr, &buf->hdr, sizeof(buf->hdr)); + memcpy(data, buf->data, blob_raw_len(buf->data)); + list_add_tail(&pending->list, &ctx->pending); + if (ctx->sock.registered) + uloop_timeout_set(&ctx->pending_timer, 1); } -void __hidden ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr) +void __hidden +ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr_buf *buf, int fd) { - struct ubus_pending_invoke *pending; - struct ubus_request *req; - - switch(hdr->type) { + switch(buf->hdr.type) { case UBUS_MSG_STATUS: - req = ubus_find_request(ctx, hdr->seq, hdr->peer); - if (!req) - break; - - ubus_process_req_status(req, hdr); - break; - case UBUS_MSG_DATA: - req = ubus_find_request(ctx, hdr->seq, hdr->peer); - if (req && (req->data_cb || req->raw_data_cb)) - ubus_req_data(req, hdr); + ubus_process_req_msg(ctx, buf, fd); break; case UBUS_MSG_INVOKE: - if (ctx->stack_depth > 2) { - pending = calloc(1, sizeof(*pending) + - blob_raw_len(hdr->data)); - - if (!pending) - return; - - memcpy(&pending->hdr, hdr, sizeof(*hdr) + - blob_raw_len(hdr->data)); - list_add(&pending->list, &ctx->pending); - } else { - ubus_process_invoke(ctx, hdr); + case UBUS_MSG_UNSUBSCRIBE: + case UBUS_MSG_NOTIFY: + if (ctx->stack_depth) { + ubus_queue_msg(ctx, buf); + break; } + + ubus_process_obj_msg(ctx, buf); + break; + case UBUS_MSG_MONITOR: + if (ctx->monitor_cb) + ctx->monitor_cb(ctx, buf->hdr.seq, buf->data); break; } } -static void ubus_process_pending_invoke(struct ubus_context *ctx) +static void ubus_process_pending_msg(struct uloop_timeout *timeout) { - struct ubus_pending_invoke *pending, *tmp; + struct ubus_context *ctx = container_of(timeout, struct ubus_context, pending_timer); + struct ubus_pending_msg *pending; - list_for_each_entry_safe(pending, tmp, &ctx->pending, list) { + while (!ctx->stack_depth && !list_empty(&ctx->pending)) { + pending = list_first_entry(&ctx->pending, struct ubus_pending_msg, list); list_del(&pending->list); - ubus_process_msg(ctx, &pending->hdr); + ubus_process_msg(ctx, &pending->hdr, -1); free(pending); - if (ctx->stack_depth > 2) - break; - } -} - -void ubus_abort_request(struct ubus_context *ctx, struct ubus_request *req) -{ - if (!list_empty(&req->list)) - return; - - req->cancelled = true; - ubus_process_req_data(req); - list_del_init(&req->list); -} - -void ubus_complete_request_async(struct ubus_context *ctx, struct ubus_request *req) -{ - if (!list_empty(&req->list)) - return; - - list_add(&req->list, &ctx->requests); -} - -static void ubus_sync_req_cb(struct ubus_request *req, int ret) -{ - req->status_msg = true; - req->status_code = ret; - uloop_end(); -} - -struct ubus_sync_req_cb { - struct uloop_timeout timeout; - struct ubus_request *req; -}; - -static void ubus_sync_req_timeout_cb(struct uloop_timeout *timeout) -{ - struct ubus_sync_req_cb *cb; - - cb = container_of(timeout, struct ubus_sync_req_cb, timeout); - ubus_sync_req_cb(cb->req, UBUS_STATUS_TIMEOUT); -} - -int ubus_complete_request(struct ubus_context *ctx, struct ubus_request *req, - int timeout) -{ - struct ubus_sync_req_cb cb; - ubus_complete_handler_t complete_cb = req->complete_cb; - bool registered = ctx->sock.registered; - int status = UBUS_STATUS_NO_DATA; - - if (!registered) { - uloop_init(); - ubus_add_uloop(ctx); } - - if (timeout) { - memset(&cb, 0, sizeof(cb)); - cb.req = req; - cb.timeout.cb = ubus_sync_req_timeout_cb; - uloop_timeout_set(&cb.timeout, timeout); - } - - ubus_complete_request_async(ctx, req); - req->complete_cb = ubus_sync_req_cb; - - ctx->stack_depth++; - while (!req->status_msg) { - bool cancelled = uloop_cancelled; - uloop_cancelled = false; - uloop_run(); - uloop_cancelled = cancelled; - } - ctx->stack_depth--; - - if (timeout) - uloop_timeout_cancel(&cb.timeout); - - if (req->status_msg) - status = req->status_code; - - req->complete_cb = complete_cb; - if (req->complete_cb) - req->complete_cb(req, status); - - if (!registered) - uloop_fd_delete(&ctx->sock); - - if (!ctx->stack_depth) - ubus_process_pending_invoke(ctx); - - return status; } struct ubus_lookup_request { @@ -359,22 +151,6 @@ static void ubus_lookup_cb(struct ubus_request *ureq, int type, struct blob_attr req->cb(ureq->ctx, &obj, ureq->priv); } -int __hidden ubus_start_request(struct ubus_context *ctx, struct ubus_request *req, - struct blob_attr *msg, int cmd, uint32_t peer) -{ - memset(req, 0, sizeof(*req)); - - if (msg && blob_pad_len(msg) > UBUS_MAX_MSGLEN) - return -1; - - INIT_LIST_HEAD(&req->list); - INIT_LIST_HEAD(&req->pending); - req->ctx = ctx; - req->peer = peer; - req->seq = ++ctx->request_seq; - return ubus_send_msg(ctx, req->seq, msg, cmd, peer); -} - int ubus_lookup(struct ubus_context *ctx, const char *path, ubus_lookup_handler_t cb, void *priv) { @@ -423,50 +199,6 @@ int ubus_lookup_id(struct ubus_context *ctx, const char *path, uint32_t *id) return ubus_complete_request(ctx, &req, 0); } -int ubus_send_reply(struct ubus_context *ctx, struct ubus_request_data *req, - struct blob_attr *msg) -{ - int ret; - - blob_buf_init(&b, 0); - blob_put_int32(&b, UBUS_ATTR_OBJID, req->object); - blob_put(&b, UBUS_ATTR_DATA, blob_data(msg), blob_len(msg)); - ret = ubus_send_msg(ctx, req->seq, b.head, UBUS_MSG_DATA, req->peer); - if (ret < 0) - return UBUS_STATUS_NO_DATA; - - return 0; -} - -int ubus_invoke_async(struct ubus_context *ctx, uint32_t obj, const char *method, - struct blob_attr *msg, struct ubus_request *req) -{ - blob_buf_init(&b, 0); - blob_put_int32(&b, UBUS_ATTR_OBJID, obj); - blob_put_string(&b, UBUS_ATTR_METHOD, method); - if (msg) - blob_put(&b, UBUS_ATTR_DATA, blob_data(msg), blob_len(msg)); - - if (ubus_start_request(ctx, req, b.head, UBUS_MSG_INVOKE, obj) < 0) - return UBUS_STATUS_INVALID_ARGUMENT; - - return 0; -} - -int ubus_invoke(struct ubus_context *ctx, uint32_t obj, const char *method, - struct blob_attr *msg, ubus_data_handler_t cb, void *priv, - int timeout) -{ - struct ubus_request req; - - ubus_invoke_async(ctx, obj, method, msg, &req); - req.data_cb = cb; - req.priv = priv; - return ubus_complete_request(ctx, &req, timeout); -} - - - static int ubus_event_cb(struct ubus_context *ctx, struct ubus_object *obj, struct ubus_request_data *req, const char *method, struct blob_attr *msg) @@ -510,109 +242,101 @@ int ubus_register_event_handler(struct ubus_context *ctx, if (pattern) blobmsg_add_string(&b2, "pattern", pattern); - return ubus_invoke(ctx, UBUS_SYSTEM_OBJECT_EVENT, "register", b2.head, + ret = ubus_invoke(ctx, UBUS_SYSTEM_OBJECT_EVENT, "register", b2.head, NULL, NULL, 0); -} - -enum { - WATCH_ID, - WATCH_NOTIFY, - __WATCH_MAX -}; - -static const struct blobmsg_policy watch_policy[] = { - [WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 }, - [WATCH_NOTIFY] = { .name = "notify", .type = BLOBMSG_TYPE_STRING }, -}; + blob_buf_free(&b2); + return ret; +} -static int ubus_watch_cb(struct ubus_context *ctx, struct ubus_object *obj, - struct ubus_request_data *req, - const char *method, struct blob_attr *msg) +int ubus_send_event(struct ubus_context *ctx, const char *id, + struct blob_attr *data) { - struct ubus_watch_object *w; - struct blob_attr *tb[__WATCH_MAX]; - - blobmsg_parse(watch_policy, ARRAY_SIZE(watch_policy), tb, blob_data(msg), blob_len(msg)); + struct ubus_request req; + void *s; - if (!tb[WATCH_ID] || !tb[WATCH_NOTIFY]) - return UBUS_STATUS_INVALID_ARGUMENT; + blob_buf_init(&b, 0); + blob_put_int32(&b, UBUS_ATTR_OBJID, UBUS_SYSTEM_OBJECT_EVENT); + blob_put_string(&b, UBUS_ATTR_METHOD, "send"); + s = blob_nest_start(&b, UBUS_ATTR_DATA); + blobmsg_add_string(&b, "id", id); + blobmsg_add_field(&b, BLOBMSG_TYPE_TABLE, "data", blob_data(data), blob_len(data)); + blob_nest_end(&b, s); - if (req->peer) + if (ubus_start_request(ctx, &req, b.head, UBUS_MSG_INVOKE, UBUS_SYSTEM_OBJECT_EVENT) < 0) return UBUS_STATUS_INVALID_ARGUMENT; - w = container_of(obj, struct ubus_watch_object, obj); - w->cb(ctx, w, blobmsg_get_u32(tb[WATCH_ID])); - return 0; + return ubus_complete_request(ctx, &req, 0); } -static const struct ubus_method watch_method = { - .name = NULL, - .handler = ubus_watch_cb, -}; - -int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *w_obj) +static void ubus_default_connection_lost(struct ubus_context *ctx) { - struct ubus_object *obj = &w_obj->obj; - - obj->methods = &watch_method; - obj->n_methods = 1; - - return ubus_add_object(ctx, obj); + if (ctx->sock.registered) + uloop_end(); } -static int -__ubus_watch_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type) +int ubus_connect_ctx(struct ubus_context *ctx, const char *path) { - struct ubus_request req; + memset(ctx, 0, sizeof(*ctx)); - blob_buf_init(&b, 0); - blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id); - blob_put_int32(&b, UBUS_ATTR_TARGET, id); - if (method) - blob_put_string(&b, UBUS_ATTR_METHOD, method); + ctx->sock.fd = -1; + ctx->sock.cb = ubus_handle_data; + ctx->connection_lost = ubus_default_connection_lost; + ctx->pending_timer.cb = ubus_process_pending_msg; - if (ubus_start_request(ctx, &req, b.head, type, 0) < 0) - return UBUS_STATUS_INVALID_ARGUMENT; + ctx->msgbuf.data = calloc(UBUS_MSG_CHUNK_SIZE, sizeof(char)); + if (!ctx->msgbuf.data) + return -1; + ctx->msgbuf_data_len = UBUS_MSG_CHUNK_SIZE; - return ubus_complete_request(ctx, &req, 0); + INIT_LIST_HEAD(&ctx->requests); + INIT_LIST_HEAD(&ctx->pending); + avl_init(&ctx->objects, ubus_cmp_id, false, NULL); + if (ubus_reconnect(ctx, path)) { + free(ctx->msgbuf.data); + return -1; + } + return 0; } -int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id) +static void ubus_auto_reconnect_cb(struct uloop_timeout *timeout) { - return __ubus_watch_request(ctx, &obj->obj, id, "event", UBUS_MSG_ADD_WATCH); -} + struct ubus_auto_conn *conn = container_of(timeout, struct ubus_auto_conn, timer); -int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id) -{ - return __ubus_watch_request(ctx, &obj->obj, id, NULL, UBUS_MSG_REMOVE_WATCH); + if (!ubus_reconnect(&conn->ctx, conn->path)) + ubus_add_uloop(&conn->ctx); + else + uloop_timeout_set(timeout, 1000); } -int ubus_send_event(struct ubus_context *ctx, const char *id, - struct blob_attr *data) +static void ubus_auto_disconnect_cb(struct ubus_context *ctx) { - struct ubus_request req; - void *s; + struct ubus_auto_conn *conn = container_of(ctx, struct ubus_auto_conn, ctx); - blob_buf_init(&b, 0); - blob_put_int32(&b, UBUS_ATTR_OBJID, UBUS_SYSTEM_OBJECT_EVENT); - blob_put_string(&b, UBUS_ATTR_METHOD, "send"); - s = blob_nest_start(&b, UBUS_ATTR_DATA); - blobmsg_add_string(&b, "id", id); - blobmsg_add_field(&b, BLOBMSG_TYPE_TABLE, "data", blob_data(data), blob_len(data)); - blob_nest_end(&b, s); + conn->timer.cb = ubus_auto_reconnect_cb; + uloop_timeout_set(&conn->timer, 1000); +} - if (ubus_start_request(ctx, &req, b.head, UBUS_MSG_INVOKE, UBUS_SYSTEM_OBJECT_EVENT) < 0) - return UBUS_STATUS_INVALID_ARGUMENT; +static void ubus_auto_connect_cb(struct uloop_timeout *timeout) +{ + struct ubus_auto_conn *conn = container_of(timeout, struct ubus_auto_conn, timer); - return ubus_complete_request(ctx, &req, 0); + if (ubus_connect_ctx(&conn->ctx, conn->path)) { + uloop_timeout_set(timeout, 1000); + fprintf(stderr, "failed to connect to ubus\n"); + return; + } + conn->ctx.connection_lost = ubus_auto_disconnect_cb; + if (conn->cb) + conn->cb(&conn->ctx); + ubus_add_uloop(&conn->ctx); } -static void ubus_default_connection_lost(struct ubus_context *ctx) +void ubus_auto_connect(struct ubus_auto_conn *conn) { - if (ctx->sock.registered) - uloop_end(); + conn->timer.cb = ubus_auto_connect_cb; + ubus_auto_connect_cb(&conn->timer); } struct ubus_context *ubus_connect(const char *path) @@ -623,14 +347,7 @@ struct ubus_context *ubus_connect(const char *path) if (!ctx) return NULL; - ctx->sock.fd = -1; - ctx->sock.cb = ubus_handle_data; - ctx->connection_lost = ubus_default_connection_lost; - - INIT_LIST_HEAD(&ctx->requests); - INIT_LIST_HEAD(&ctx->pending); - avl_init(&ctx->objects, ubus_cmp_id, false, NULL); - if (ubus_reconnect(ctx, path)) { + if (ubus_connect_ctx(ctx, path)) { free(ctx); ctx = NULL; } @@ -638,8 +355,17 @@ struct ubus_context *ubus_connect(const char *path) return ctx; } -void ubus_free(struct ubus_context *ctx) +void ubus_shutdown(struct ubus_context *ctx) { + blob_buf_free(&b); + if (!ctx) + return; close(ctx->sock.fd); + free(ctx->msgbuf.data); +} + +void ubus_free(struct ubus_context *ctx) +{ + ubus_shutdown(ctx); free(ctx); }