split event sending from event forwarding
[project/ubus.git] / ubusd.c
1 #include <sys/socket.h>
2 #include <sys/uio.h>
3 #include <signal.h>
4 #include <stdio.h>
5 #include <unistd.h>
6 #include <fcntl.h>
7
8 #include <libubox/blob.h>
9 #include <libubox/uloop.h>
10 #include <libubox/usock.h>
11 #include <libubox/list.h>
12
13 #include "ubusd.h"
14
15 static struct ubus_msg_buf *ubus_msg_unshare(struct ubus_msg_buf *ub)
16 {
17         ub = realloc(ub, sizeof(*ub) + ub->len);
18         if (!ub)
19                 return NULL;
20
21         ub->refcount = 1;
22         memcpy(ub + 1, ub->data, ub->len);
23         ub->data = (void *) (ub + 1);
24         return ub;
25 }
26
27 static struct ubus_msg_buf *ubus_msg_ref(struct ubus_msg_buf *ub)
28 {
29         if (ub->refcount == ~0)
30                 return ubus_msg_unshare(ub);
31
32         ub->refcount++;
33         return ub;
34 }
35
36 struct ubus_msg_buf *ubus_msg_new(void *data, int len, bool shared)
37 {
38         struct ubus_msg_buf *ub;
39         int buflen = sizeof(*ub);
40
41         if (!shared)
42                 buflen += len;
43
44         ub = calloc(1, buflen);
45         if (!ub)
46                 return NULL;
47
48         if (shared) {
49                 ub->refcount = ~0;
50                 ub->data = data;
51         } else {
52                 ub->refcount = 1;
53                 ub->data = (void *) (ub + 1);
54                 if (data)
55                         memcpy(ub + 1, data, len);
56         }
57
58         ub->len = len;
59         return ub;
60 }
61
62 void ubus_msg_free(struct ubus_msg_buf *ub)
63 {
64         switch (ub->refcount) {
65         case 1:
66         case ~0:
67                 free(ub);
68                 break;
69         default:
70                 ub->refcount--;
71                 break;
72         }
73 }
74
75 static int ubus_msg_writev(int fd, struct ubus_msg_buf *ub, int offset)
76 {
77         struct iovec iov[2];
78
79         if (offset < sizeof(ub->hdr)) {
80                 iov[0].iov_base = ((char *) &ub->hdr) + offset;
81                 iov[0].iov_len = sizeof(ub->hdr) - offset;
82                 iov[1].iov_base = (char *) ub->data;
83                 iov[1].iov_len = ub->len;
84                 return writev(fd, iov, 2);
85         } else {
86                 offset -= sizeof(ub->hdr);
87                 return write(fd, ((char *) ub->data) + offset, ub->len - offset);
88         }
89 }
90
91 static void ubus_msg_enqueue(struct ubus_client *cl, struct ubus_msg_buf *ub)
92 {
93         if (cl->tx_queue[cl->txq_tail])
94                 return;
95
96         cl->tx_queue[cl->txq_tail] = ubus_msg_ref(ub);
97         cl->txq_tail = (cl->txq_tail + 1) % ARRAY_SIZE(cl->tx_queue);
98 }
99
100 /* takes the msgbuf reference */
101 void ubus_msg_send(struct ubus_client *cl, struct ubus_msg_buf *ub, bool free)
102 {
103         int written;
104
105         if (!cl->tx_queue[cl->txq_cur]) {
106                 written = ubus_msg_writev(cl->sock.fd, ub, 0);
107                 if (written >= ub->len + sizeof(ub->hdr))
108                         goto out;
109
110                 if (written < 0)
111                         written = 0;
112
113                 cl->txq_ofs = written;
114
115                 /* get an event once we can write to the socket again */
116                 uloop_fd_add(&cl->sock, ULOOP_READ | ULOOP_WRITE | ULOOP_EDGE_TRIGGER);
117         }
118         ubus_msg_enqueue(cl, ub);
119
120 out:
121         if (free)
122                 ubus_msg_free(ub);
123 }
124
125 static struct ubus_msg_buf *ubus_msg_head(struct ubus_client *cl)
126 {
127         return cl->tx_queue[cl->txq_cur];
128 }
129
130 static void ubus_msg_dequeue(struct ubus_client *cl)
131 {
132         struct ubus_msg_buf *ub = ubus_msg_head(cl);
133
134         if (!ub)
135                 return;
136
137         ubus_msg_free(ub);
138         cl->txq_ofs = 0;
139         cl->tx_queue[cl->txq_cur] = NULL;
140         cl->txq_cur = (cl->txq_cur + 1) % ARRAY_SIZE(cl->tx_queue);
141 }
142
143 static void handle_client_disconnect(struct ubus_client *cl)
144 {
145         while (ubus_msg_head(cl))
146                 ubus_msg_dequeue(cl);
147
148         ubusd_proto_free_client(cl);
149         uloop_fd_delete(&cl->sock);
150         close(cl->sock.fd);
151         free(cl);
152 }
153
154 static void client_cb(struct uloop_fd *sock, unsigned int events)
155 {
156         struct ubus_client *cl = container_of(sock, struct ubus_client, sock);
157         struct ubus_msg_buf *ub;
158
159         /* first try to tx more pending data */
160         while ((ub = ubus_msg_head(cl))) {
161                 int written;
162
163                 written = ubus_msg_writev(sock->fd, ub, cl->txq_ofs);
164                 if (written < 0) {
165                         switch(errno) {
166                         case EINTR:
167                         case EAGAIN:
168                                 break;
169                         default:
170                                 goto disconnect;
171                         }
172                         break;
173                 }
174
175                 cl->txq_ofs += written;
176                 if (cl->txq_ofs < ub->len + sizeof(ub->hdr))
177                         break;
178
179                 ubus_msg_dequeue(cl);
180         }
181
182         /* prevent further ULOOP_WRITE events if we don't have data
183          * to send anymore */
184         if (!ubus_msg_head(cl) && (events & ULOOP_WRITE))
185                 uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER);
186
187 retry:
188         if (!sock->eof && cl->pending_msg_offset < sizeof(cl->hdrbuf)) {
189                 int offset = cl->pending_msg_offset;
190                 int bytes;
191
192                 bytes = read(sock->fd, (char *)&cl->hdrbuf + offset, sizeof(cl->hdrbuf) - offset);
193                 if (bytes < 0)
194                         goto out;
195
196                 cl->pending_msg_offset += bytes;
197                 if (cl->pending_msg_offset < sizeof(cl->hdrbuf))
198                         goto out;
199
200                 if (blob_pad_len(&cl->hdrbuf.data) > UBUS_MAX_MSGLEN)
201                         goto disconnect;
202
203                 cl->pending_msg = ubus_msg_new(NULL, blob_raw_len(&cl->hdrbuf.data), false);
204                 if (!cl->pending_msg)
205                         goto disconnect;
206
207                 memcpy(&cl->pending_msg->hdr, &cl->hdrbuf.hdr, sizeof(cl->hdrbuf.hdr));
208                 memcpy(cl->pending_msg->data, &cl->hdrbuf.data, sizeof(cl->hdrbuf.data));
209         }
210
211         ub = cl->pending_msg;
212         if (ub) {
213                 int offset = cl->pending_msg_offset - sizeof(ub->hdr);
214                 int len = blob_raw_len(ub->data) - offset;
215                 int bytes = 0;
216
217                 if (len > 0) {
218                         bytes = read(sock->fd, (char *) ub->data + offset, len);
219                         if (bytes <= 0)
220                                 goto out;
221                 }
222
223                 if (bytes < len) {
224                         cl->pending_msg_offset += bytes;
225                         goto out;
226                 }
227
228                 /* accept message */
229                 cl->pending_msg_offset = 0;
230                 cl->pending_msg = NULL;
231                 ubusd_proto_receive_message(cl, ub);
232                 goto retry;
233         }
234
235 out:
236         if (!sock->eof || ubus_msg_head(cl))
237                 return;
238
239 disconnect:
240         handle_client_disconnect(cl);
241 }
242
243 static bool get_next_connection(int fd)
244 {
245         struct ubus_client *cl;
246         int client_fd;
247
248         client_fd = accept(fd, NULL, 0);
249         if (client_fd < 0) {
250                 switch (errno) {
251                 case ECONNABORTED:
252                 case EINTR:
253                         return true;
254                 default:
255                         return false;
256                 }
257         }
258
259         cl = ubusd_proto_new_client(client_fd, client_cb);
260         if (cl)
261                 uloop_fd_add(&cl->sock, ULOOP_READ | ULOOP_EDGE_TRIGGER);
262         else
263                 close(client_fd);
264
265         return true;
266 }
267
268 static void server_cb(struct uloop_fd *fd, unsigned int events)
269 {
270         bool next;
271
272         do {
273                 next = get_next_connection(fd->fd);
274         } while (next);
275 }
276
277 static struct uloop_fd server_fd = {
278         .cb = server_cb,
279 };
280
281 static int usage(const char *progname)
282 {
283         fprintf(stderr, "Usage: %s [<options>]\n"
284                 "Options: \n"
285                 "  -s <socket>:         Set the unix domain socket to listen on\n"
286                 "\n", progname);
287         return 1;
288 }
289
290 int main(int argc, char **argv)
291 {
292         const char *ubus_socket = UBUS_UNIX_SOCKET;
293         int ret = 0;
294         int ch;
295
296         signal(SIGPIPE, SIG_IGN);
297
298         uloop_init();
299
300         while ((ch = getopt(argc, argv, "s:")) != -1) {
301                 switch (ch) {
302                 case 's':
303                         ubus_socket = optarg;
304                         break;
305                 default:
306                         return usage(argv[0]);
307                 }
308         }
309
310         unlink(ubus_socket);
311         server_fd.fd = usock(USOCK_UNIX | USOCK_SERVER | USOCK_NONBLOCK, ubus_socket, NULL);
312         if (server_fd.fd < 0) {
313                 perror("usock");
314                 ret = -1;
315                 goto out;
316         }
317         uloop_fd_add(&server_fd, ULOOP_READ | ULOOP_EDGE_TRIGGER);
318
319         uloop_run();
320         unlink(ubus_socket);
321
322 out:
323         uloop_done();
324         return ret;
325 }