X-Git-Url: http://git.archive.openwrt.org/?p=project%2Fubus.git;a=blobdiff_plain;f=libubus.c;h=be4e6acdd797480225631b45d70e27909bbe0bcb;hp=03899d6db44e766513053e0eb65a5acee27cc6ee;hb=6280cbaf0966928c4f83b925fb5b94c3ecb1936e;hpb=d366a6de839087d8a17e5855c14ae55e95b13c65 diff --git a/libubus.c b/libubus.c index 03899d6..be4e6ac 100644 --- a/libubus.c +++ b/libubus.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011-2012 Felix Fietkau + * Copyright (C) 2011-2014 Felix Fietkau * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License version 2.1 @@ -38,19 +38,11 @@ const char *__ubus_strerror[__UBUS_STATUS_LAST] = { struct blob_buf b __hidden = {}; -struct ubus_pending_data { +struct ubus_pending_msg { struct list_head list; - int type; - struct blob_attr data[]; + struct ubus_msghdr_buf hdr; }; -struct ubus_pending_invoke { - struct list_head list; - struct ubus_msghdr hdr; -}; - -static void ubus_process_pending_invoke(struct ubus_context *ctx); - static int ubus_cmp_id(const void *k1, const void *k2, void *ptr) { const uint32_t *id1 = k1, *id2 = k2; @@ -78,265 +70,62 @@ out: return err; } -static bool ubus_get_status(struct ubus_msghdr *hdr, int *ret) +static void +ubus_queue_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr) { - struct blob_attr **attrbuf = ubus_parse_msg(hdr->data); - - if (!attrbuf[UBUS_ATTR_STATUS]) - return false; - - *ret = blob_get_u32(attrbuf[UBUS_ATTR_STATUS]); - return true; -} - -static void req_data_cb(struct ubus_request *req, int type, struct blob_attr *data) -{ - struct blob_attr **attr; - - if (req->raw_data_cb) - req->raw_data_cb(req, type, data); + struct ubus_pending_msg *pending; - if (!req->data_cb) + pending = calloc(1, sizeof(*pending)); + if (!pending) return; - - attr = ubus_parse_msg(data); - req->data_cb(req, type, attr[UBUS_ATTR_DATA]); -} - -static void ubus_process_req_data(struct ubus_request *req) -{ - struct ubus_pending_data *data; - - while (!list_empty(&req->pending)) { - data = list_first_entry(&req->pending, - struct ubus_pending_data, list); - list_del(&data->list); - if (!req->cancelled) - req_data_cb(req, data->type, data->data); - free(data); - } -} - -static void ubus_req_complete_cb(struct ubus_request *req) -{ - ubus_complete_handler_t cb = req->complete_cb; - - if (!cb) + pending->hdr.data = calloc(1, blob_raw_len(ubus_msghdr_data(hdr))); + if (!pending->hdr.data) return; - req->complete_cb = NULL; - cb(req, req->status_code); -} - -static int ubus_process_req_status(struct ubus_request *req, struct ubus_msghdr *hdr) -{ - int ret = UBUS_STATUS_INVALID_ARGUMENT; - - if (!list_empty(&req->list)) - list_del_init(&req->list); - - ubus_get_status(hdr, &ret); - req->peer = hdr->peer; - req->status_msg = true; - req->status_code = ret; - if (!req->blocked) - ubus_req_complete_cb(req); - - return ret; -} - -static void ubus_req_data(struct ubus_request *req, struct ubus_msghdr *hdr) -{ - struct ubus_pending_data *data; - int len; - - if (!req->blocked) { - req->blocked = true; - req_data_cb(req, hdr->type, hdr->data); - ubus_process_req_data(req); - req->blocked = false; - - if (req->status_msg) - ubus_req_complete_cb(req); - - return; - } - - len = blob_raw_len(hdr->data); - data = calloc(1, sizeof(*data) + len); - if (!data) - return; - - data->type = hdr->type; - memcpy(data->data, hdr->data, len); - list_add(&data->list, &req->pending); -} - -static struct ubus_request *ubus_find_request(struct ubus_context *ctx, uint32_t seq, uint32_t peer) -{ - struct ubus_request *req; - - list_for_each_entry(req, &ctx->requests, list) { - if (seq != req->seq || peer != req->peer) - continue; - - return req; - } - return NULL; -} - -void ubus_complete_deferred_request(struct ubus_context *ctx, struct ubus_request_data *req, int ret) -{ - blob_buf_init(&b, 0); - blob_put_int32(&b, UBUS_ATTR_STATUS, ret); - blob_put_int32(&b, UBUS_ATTR_OBJID, req->object); - ubus_send_msg(ctx, req->seq, b.head, UBUS_MSG_STATUS, req->peer); + memcpy(&pending->hdr.hdr, hdr, sizeof(*hdr)); + memcpy(pending->hdr.data, ubus_msghdr_data(hdr), blob_raw_len(ubus_msghdr_data(hdr))); + list_add(&pending->list, &ctx->pending); + if (ctx->sock.registered) + uloop_timeout_set(&ctx->pending_timer, 1); } -void __hidden ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr) +void __hidden +ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr, int fd) { - struct ubus_pending_invoke *pending; - struct ubus_request *req; switch(hdr->type) { case UBUS_MSG_STATUS: - req = ubus_find_request(ctx, hdr->seq, hdr->peer); - if (!req) - break; - - ubus_process_req_status(req, hdr); - break; - case UBUS_MSG_DATA: - req = ubus_find_request(ctx, hdr->seq, hdr->peer); - if (req && (req->data_cb || req->raw_data_cb)) - ubus_req_data(req, hdr); + ubus_process_req_msg(ctx, hdr, fd); break; case UBUS_MSG_INVOKE: - if (ctx->stack_depth > 2) { - pending = calloc(1, sizeof(*pending) + - blob_raw_len(hdr->data)); - - if (!pending) - return; - - memcpy(&pending->hdr, hdr, sizeof(*hdr) + - blob_raw_len(hdr->data)); - list_add(&pending->list, &ctx->pending); - } else { - ubus_process_invoke(ctx, hdr); + case UBUS_MSG_UNSUBSCRIBE: + case UBUS_MSG_NOTIFY: + if (ctx->stack_depth) { + ubus_queue_msg(ctx, hdr); + break; } - break; - case UBUS_MSG_UNSUBSCRIBE: - ubus_process_unsubscribe(ctx, hdr); + ubus_process_obj_msg(ctx, hdr); break; } } -static void ubus_process_pending_invoke(struct ubus_context *ctx) +static void ubus_process_pending_msg(struct uloop_timeout *timeout) { - struct ubus_pending_invoke *pending, *tmp; + struct ubus_context *ctx = container_of(timeout, struct ubus_context, pending_timer); + struct ubus_pending_msg *pending; - list_for_each_entry_safe(pending, tmp, &ctx->pending, list) { + while (!ctx->stack_depth && !list_empty(&ctx->pending)) { + pending = list_first_entry(&ctx->pending, struct ubus_pending_msg, list); list_del(&pending->list); - ubus_process_msg(ctx, &pending->hdr); + ubus_process_msg(ctx, &pending->hdr.hdr, -1); + free(pending->hdr.data); free(pending); - if (ctx->stack_depth > 2) - break; } } -void ubus_abort_request(struct ubus_context *ctx, struct ubus_request *req) -{ - if (!list_empty(&req->list)) - return; - - req->cancelled = true; - ubus_process_req_data(req); - list_del_init(&req->list); -} - -void ubus_complete_request_async(struct ubus_context *ctx, struct ubus_request *req) -{ - if (!list_empty(&req->list)) - return; - - list_add(&req->list, &ctx->requests); -} - -static void ubus_sync_req_cb(struct ubus_request *req, int ret) -{ - req->status_msg = true; - req->status_code = ret; - uloop_end(); -} - -struct ubus_sync_req_cb { - struct uloop_timeout timeout; - struct ubus_request *req; -}; - -static void ubus_sync_req_timeout_cb(struct uloop_timeout *timeout) -{ - struct ubus_sync_req_cb *cb; - - cb = container_of(timeout, struct ubus_sync_req_cb, timeout); - ubus_sync_req_cb(cb->req, UBUS_STATUS_TIMEOUT); -} - -int ubus_complete_request(struct ubus_context *ctx, struct ubus_request *req, - int timeout) -{ - struct ubus_sync_req_cb cb; - ubus_complete_handler_t complete_cb = req->complete_cb; - bool registered = ctx->sock.registered; - int status = UBUS_STATUS_NO_DATA; - - if (!registered) { - uloop_init(); - ubus_add_uloop(ctx); - } - - if (timeout) { - memset(&cb, 0, sizeof(cb)); - cb.req = req; - cb.timeout.cb = ubus_sync_req_timeout_cb; - uloop_timeout_set(&cb.timeout, timeout); - } - - ubus_complete_request_async(ctx, req); - req->complete_cb = ubus_sync_req_cb; - - ctx->stack_depth++; - while (!req->status_msg) { - bool cancelled = uloop_cancelled; - uloop_cancelled = false; - uloop_run(); - uloop_cancelled = cancelled; - } - ctx->stack_depth--; - - if (timeout) - uloop_timeout_cancel(&cb.timeout); - - if (req->status_msg) - status = req->status_code; - - req->complete_cb = complete_cb; - if (req->complete_cb) - req->complete_cb(req, status); - - if (!registered) - uloop_fd_delete(&ctx->sock); - - if (!ctx->stack_depth) - ubus_process_pending_invoke(ctx); - - return status; -} - struct ubus_lookup_request { struct ubus_request req; ubus_lookup_handler_t cb; @@ -363,22 +152,6 @@ static void ubus_lookup_cb(struct ubus_request *ureq, int type, struct blob_attr req->cb(ureq->ctx, &obj, ureq->priv); } -int __hidden ubus_start_request(struct ubus_context *ctx, struct ubus_request *req, - struct blob_attr *msg, int cmd, uint32_t peer) -{ - memset(req, 0, sizeof(*req)); - - if (msg && blob_pad_len(msg) > UBUS_MAX_MSGLEN) - return -1; - - INIT_LIST_HEAD(&req->list); - INIT_LIST_HEAD(&req->pending); - req->ctx = ctx; - req->peer = peer; - req->seq = ++ctx->request_seq; - return ubus_send_msg(ctx, req->seq, msg, cmd, peer); -} - int ubus_lookup(struct ubus_context *ctx, const char *path, ubus_lookup_handler_t cb, void *priv) { @@ -427,50 +200,6 @@ int ubus_lookup_id(struct ubus_context *ctx, const char *path, uint32_t *id) return ubus_complete_request(ctx, &req, 0); } -int ubus_send_reply(struct ubus_context *ctx, struct ubus_request_data *req, - struct blob_attr *msg) -{ - int ret; - - blob_buf_init(&b, 0); - blob_put_int32(&b, UBUS_ATTR_OBJID, req->object); - blob_put(&b, UBUS_ATTR_DATA, blob_data(msg), blob_len(msg)); - ret = ubus_send_msg(ctx, req->seq, b.head, UBUS_MSG_DATA, req->peer); - if (ret < 0) - return UBUS_STATUS_NO_DATA; - - return 0; -} - -int ubus_invoke_async(struct ubus_context *ctx, uint32_t obj, const char *method, - struct blob_attr *msg, struct ubus_request *req) -{ - blob_buf_init(&b, 0); - blob_put_int32(&b, UBUS_ATTR_OBJID, obj); - blob_put_string(&b, UBUS_ATTR_METHOD, method); - if (msg) - blob_put(&b, UBUS_ATTR_DATA, blob_data(msg), blob_len(msg)); - - if (ubus_start_request(ctx, req, b.head, UBUS_MSG_INVOKE, obj) < 0) - return UBUS_STATUS_INVALID_ARGUMENT; - - return 0; -} - -int ubus_invoke(struct ubus_context *ctx, uint32_t obj, const char *method, - struct blob_attr *msg, ubus_data_handler_t cb, void *priv, - int timeout) -{ - struct ubus_request req; - - ubus_invoke_async(ctx, obj, method, msg, &req); - req.data_cb = cb; - req.priv = priv; - return ubus_complete_request(ctx, &req, timeout); -} - - - static int ubus_event_cb(struct ubus_context *ctx, struct ubus_object *obj, struct ubus_request_data *req, const char *method, struct blob_attr *msg) @@ -544,22 +273,77 @@ static void ubus_default_connection_lost(struct ubus_context *ctx) uloop_end(); } -struct ubus_context *ubus_connect(const char *path) +static int _ubus_connect(struct ubus_context *ctx, const char *path) { - struct ubus_context *ctx; - - ctx = calloc(1, sizeof(*ctx)); - if (!ctx) - return NULL; - ctx->sock.fd = -1; ctx->sock.cb = ubus_handle_data; ctx->connection_lost = ubus_default_connection_lost; + ctx->pending_timer.cb = ubus_process_pending_msg; + + ctx->msgbuf.data = calloc(UBUS_MSG_CHUNK_SIZE, sizeof(char)); + if (!ctx->msgbuf.data) + return -1; + ctx->msgbuf_data_len = UBUS_MSG_CHUNK_SIZE; INIT_LIST_HEAD(&ctx->requests); INIT_LIST_HEAD(&ctx->pending); avl_init(&ctx->objects, ubus_cmp_id, false, NULL); if (ubus_reconnect(ctx, path)) { + free(ctx->msgbuf.data); + return -1; + } + + return 0; +} + +static void ubus_auto_reconnect_cb(struct uloop_timeout *timeout) +{ + struct ubus_auto_conn *conn = container_of(timeout, struct ubus_auto_conn, timer); + + if (!ubus_reconnect(&conn->ctx, conn->path)) + ubus_add_uloop(&conn->ctx); + else + uloop_timeout_set(timeout, 1000); +} + +static void ubus_auto_disconnect_cb(struct ubus_context *ctx) +{ + struct ubus_auto_conn *conn = container_of(ctx, struct ubus_auto_conn, ctx); + + conn->timer.cb = ubus_auto_reconnect_cb; + uloop_timeout_set(&conn->timer, 1000); +} + +static void ubus_auto_connect_cb(struct uloop_timeout *timeout) +{ + struct ubus_auto_conn *conn = container_of(timeout, struct ubus_auto_conn, timer); + + if (_ubus_connect(&conn->ctx, conn->path)) { + uloop_timeout_set(timeout, 1000); + fprintf(stderr, "failed to connect to ubus\n"); + return; + } + conn->ctx.connection_lost = ubus_auto_disconnect_cb; + if (conn->cb) + conn->cb(&conn->ctx); + ubus_add_uloop(&conn->ctx); +} + +void ubus_auto_connect(struct ubus_auto_conn *conn) +{ + conn->timer.cb = ubus_auto_connect_cb; + ubus_auto_connect_cb(&conn->timer); +} + +struct ubus_context *ubus_connect(const char *path) +{ + struct ubus_context *ctx; + + ctx = calloc(1, sizeof(*ctx)); + if (!ctx) + return NULL; + + if (_ubus_connect(ctx, path)) { free(ctx); ctx = NULL; } @@ -569,6 +353,8 @@ struct ubus_context *ubus_connect(const char *path) void ubus_free(struct ubus_context *ctx) { + blob_buf_free(&b); close(ctx->sock.fd); + free(ctx->msgbuf.data); free(ctx); }