The data contained in this repository can be downloaded to your computer using one of several clients.
Please see the documentation of your version control software client for more information.
Please select the desired protocol below to get the URL.
This URL has Read-Only access.
main_repo / deps / liboi / oi_socket.c @ 40c0f755
History | View | Annotate | Download (21.2 KB)
1 |
#include <stdio.h> |
---|---|
2 |
#include <stdlib.h> |
3 |
#include <assert.h> |
4 |
#include <unistd.h> /* close() */ |
5 |
#include <fcntl.h> /* fcntl() */ |
6 |
#include <errno.h> /* for the default methods */ |
7 |
#include <string.h> /* memset */ |
8 |
|
9 |
#include <netinet/tcp.h> /* TCP_NODELAY */ |
10 |
|
11 |
#include <ev.h> |
12 |
#include <oi_socket.h> |
13 |
|
14 |
#if HAVE_GNUTLS
|
15 |
# include <gnutls/gnutls.h> |
16 |
# define GNUTLS_NEED_WRITE (gnutls_record_get_direction(socket->session) == 1) |
17 |
# define GNUTLS_NEED_READ (gnutls_record_get_direction(socket->session) == 0) |
18 |
#endif
|
19 |
|
20 |
#undef TRUE
|
21 |
#define TRUE 1 |
22 |
#undef FALSE
|
23 |
#define FALSE 0 |
24 |
#undef MIN
|
25 |
#define MIN(a,b) ((a) < (b) ? (a) : (b))
|
26 |
|
27 |
#define OKAY 0 |
28 |
#define AGAIN 1 |
29 |
#define ERROR 2 |
30 |
|
31 |
#define RAISE_ERROR(s, _domain, _code) do { \ |
32 |
if(s->on_error) { \
|
33 |
struct oi_error __oi_error; \
|
34 |
__oi_error.domain = _domain; \ |
35 |
__oi_error.code = _code; \ |
36 |
s->on_error(s, __oi_error); \ |
37 |
} \ |
38 |
} while(0) \ |
39 |
|
40 |
static int |
41 |
full_close(oi_socket *socket) |
42 |
{ |
43 |
if(-1 == close(socket->fd) && errno == EINTR) { |
44 |
/* TODO fd still open. next loop call close again? */
|
45 |
assert(0 && "implement me"); |
46 |
return ERROR;
|
47 |
} |
48 |
|
49 |
socket->read_action = NULL;
|
50 |
socket->write_action = NULL;
|
51 |
|
52 |
if(socket->loop) {
|
53 |
ev_feed_event(socket->loop, &socket->read_watcher, EV_READ); |
54 |
} |
55 |
return OKAY;
|
56 |
} |
57 |
|
58 |
static int |
59 |
half_close(oi_socket *socket) |
60 |
{ |
61 |
int r = shutdown(socket->fd, SHUT_WR);
|
62 |
|
63 |
if(r == -1) { |
64 |
RAISE_ERROR(socket, OI_ERROR_SHUTDOWN, errno); |
65 |
return ERROR;
|
66 |
} |
67 |
|
68 |
socket->write_action = NULL;
|
69 |
|
70 |
/* TODO set timer to zero so we get a callback soon */
|
71 |
return OKAY;
|
72 |
} |
73 |
|
74 |
static void |
75 |
update_write_buffer_after_send(oi_socket *socket, ssize_t sent) |
76 |
{ |
77 |
oi_queue *q = oi_queue_last(&socket->out_stream); |
78 |
oi_buf *to_write = oi_queue_data(q, oi_buf, queue); |
79 |
to_write->written += sent; |
80 |
socket->written += sent; |
81 |
|
82 |
if(to_write->written == to_write->len) {
|
83 |
|
84 |
oi_queue_remove(q); |
85 |
|
86 |
if(to_write->release) {
|
87 |
to_write->release(to_write); |
88 |
} |
89 |
|
90 |
if(oi_queue_empty(&socket->out_stream)) {
|
91 |
ev_io_stop(socket->loop, &socket->write_watcher); |
92 |
if(socket->on_drain)
|
93 |
socket->on_drain(socket); |
94 |
} |
95 |
} |
96 |
} |
97 |
|
98 |
|
99 |
#if HAVE_GNUTLS
|
100 |
static int secure_socket_send(oi_socket *socket); |
101 |
static int secure_socket_recv(oi_socket *socket); |
102 |
|
103 |
/* TODO can this be done without ignoring SIGPIPE? */
|
104 |
static ssize_t
|
105 |
nosigpipe_push(gnutls_transport_ptr_t data, const void *buf, size_t len) |
106 |
{ |
107 |
oi_socket *socket = (oi_socket*)data; |
108 |
assert(socket->secure); |
109 |
int flags = 0; |
110 |
#ifdef MSG_NOSIGNAL
|
111 |
flags |= MSG_NOSIGNAL; |
112 |
#endif
|
113 |
#ifdef MSG_DONTWAIT
|
114 |
flags |= MSG_DONTWAIT; |
115 |
#endif
|
116 |
int r = send(socket->fd, buf, len, flags);
|
117 |
|
118 |
if(r == -1) { |
119 |
gnutls_transport_set_errno(socket->session, errno); /* necessary ? */
|
120 |
} |
121 |
|
122 |
return r;
|
123 |
} |
124 |
|
125 |
static int |
126 |
secure_handshake(oi_socket *socket) |
127 |
{ |
128 |
assert(socket->secure); |
129 |
|
130 |
int r = gnutls_handshake(socket->session);
|
131 |
|
132 |
if(gnutls_error_is_fatal(r)) {
|
133 |
RAISE_ERROR(socket, OI_ERROR_GNUTLS, r); |
134 |
return ERROR;
|
135 |
} |
136 |
|
137 |
if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
|
138 |
return AGAIN;
|
139 |
|
140 |
oi_socket_reset_timeout(socket); |
141 |
|
142 |
if(!socket->connected) {
|
143 |
socket->connected = TRUE; |
144 |
if(socket->on_connect)
|
145 |
socket->on_connect(socket); |
146 |
} |
147 |
|
148 |
if(socket->read_action)
|
149 |
socket->read_action = secure_socket_recv; |
150 |
|
151 |
if(socket->write_action)
|
152 |
socket->write_action = secure_socket_send; |
153 |
|
154 |
return OKAY;
|
155 |
} |
156 |
|
157 |
static int |
158 |
secure_socket_send(oi_socket *socket) |
159 |
{ |
160 |
ssize_t sent; |
161 |
|
162 |
if(oi_queue_empty(&socket->out_stream)) {
|
163 |
ev_io_stop(socket->loop, &socket->write_watcher); |
164 |
return AGAIN;
|
165 |
} |
166 |
|
167 |
oi_queue *q = oi_queue_last(&socket->out_stream); |
168 |
oi_buf *to_write = oi_queue_data(q, oi_buf, queue); |
169 |
|
170 |
assert(socket->secure); |
171 |
|
172 |
sent = gnutls_record_send( socket->session |
173 |
, to_write->base + to_write->written |
174 |
, to_write->len - to_write->written |
175 |
); |
176 |
|
177 |
if(gnutls_error_is_fatal(sent)) {
|
178 |
RAISE_ERROR(socket, OI_ERROR_GNUTLS, sent); |
179 |
return ERROR;
|
180 |
} |
181 |
|
182 |
if(sent == 0) |
183 |
return AGAIN;
|
184 |
|
185 |
oi_socket_reset_timeout(socket); |
186 |
|
187 |
if(sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) {
|
188 |
if(GNUTLS_NEED_READ) {
|
189 |
if(socket->read_action) {
|
190 |
socket->read_action = secure_socket_send; |
191 |
} else {
|
192 |
/* TODO GnuTLS needs read but already got EOF */
|
193 |
assert(0 && "needs read but already got EOF"); |
194 |
return ERROR;
|
195 |
} |
196 |
} |
197 |
return AGAIN;
|
198 |
} |
199 |
|
200 |
if(sent > 0) { |
201 |
/* make sure the callbacks are correct */
|
202 |
if(socket->read_action)
|
203 |
socket->read_action = secure_socket_recv; |
204 |
update_write_buffer_after_send(socket, sent); |
205 |
return OKAY;
|
206 |
} |
207 |
|
208 |
assert(0 && "Unhandled return code from gnutls_record_send()!"); |
209 |
return ERROR;
|
210 |
} |
211 |
|
212 |
static int |
213 |
secure_socket_recv(oi_socket *socket) |
214 |
{ |
215 |
char recv_buffer[TCP_MAXWIN];
|
216 |
size_t recv_buffer_size = MIN(TCP_MAXWIN, socket->chunksize); |
217 |
ssize_t recved; |
218 |
|
219 |
assert(socket->secure); |
220 |
|
221 |
recved = gnutls_record_recv(socket->session, recv_buffer, recv_buffer_size); |
222 |
|
223 |
//printf("secure socket recv %d %p\n", recved, socket->on_connect);
|
224 |
|
225 |
if(gnutls_error_is_fatal(recved)) {
|
226 |
RAISE_ERROR(socket, OI_ERROR_GNUTLS, recved); |
227 |
return ERROR;
|
228 |
} |
229 |
|
230 |
if(recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN) {
|
231 |
if(GNUTLS_NEED_WRITE) {
|
232 |
if(socket->write_action) {
|
233 |
printf("need write\n");
|
234 |
socket->write_action = secure_socket_recv; |
235 |
} else {
|
236 |
/* TODO GnuTLS needs send but already closed write end */
|
237 |
assert(0 && "needs read but cannot"); |
238 |
return ERROR;
|
239 |
} |
240 |
} |
241 |
return AGAIN;
|
242 |
} |
243 |
|
244 |
oi_socket_reset_timeout(socket); |
245 |
|
246 |
/* A server may also receive GNUTLS_E_REHANDSHAKE when a client has
|
247 |
* initiated a handshake. In that case the server can only initiate a
|
248 |
* handshake or terminate the connection. */
|
249 |
if(recved == GNUTLS_E_REHANDSHAKE) {
|
250 |
if(socket->write_action) {
|
251 |
socket->read_action = secure_handshake; |
252 |
socket->write_action = secure_handshake; |
253 |
return OKAY;
|
254 |
} else {
|
255 |
/* TODO */
|
256 |
assert(0 && "needs read but cannot"); |
257 |
return ERROR;
|
258 |
} |
259 |
} |
260 |
|
261 |
if(recved >= 0) { |
262 |
/* Got EOF */
|
263 |
if(recved == 0) |
264 |
socket->read_action = NULL;
|
265 |
|
266 |
if(socket->write_action)
|
267 |
socket->write_action = secure_socket_send; |
268 |
|
269 |
if(socket->on_read) { socket->on_read(socket, recv_buffer, recved); }
|
270 |
|
271 |
return OKAY;
|
272 |
} |
273 |
|
274 |
assert(0 && "Unhandled return code from gnutls_record_send()!"); |
275 |
return ERROR;
|
276 |
} |
277 |
|
278 |
static int |
279 |
secure_goodbye(oi_socket *socket, gnutls_close_request_t how) |
280 |
{ |
281 |
assert(socket->secure); |
282 |
|
283 |
int r = gnutls_bye(socket->session, how);
|
284 |
|
285 |
if(gnutls_error_is_fatal(r)) {
|
286 |
RAISE_ERROR(socket, OI_ERROR_GNUTLS, r); |
287 |
return ERROR;
|
288 |
} |
289 |
|
290 |
if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
|
291 |
return AGAIN;
|
292 |
|
293 |
return OKAY;
|
294 |
} |
295 |
|
296 |
static int |
297 |
secure_full_goodbye(oi_socket *socket) |
298 |
{ |
299 |
int r = secure_goodbye(socket, GNUTLS_SHUT_RDWR);
|
300 |
if(OKAY == r) {
|
301 |
return full_close(socket);
|
302 |
} |
303 |
return r;
|
304 |
} |
305 |
|
306 |
static int |
307 |
secure_half_goodbye(oi_socket *socket) |
308 |
{ |
309 |
int r = secure_goodbye(socket, GNUTLS_SHUT_WR);
|
310 |
if(OKAY == r) {
|
311 |
return half_close(socket);
|
312 |
} |
313 |
return r;
|
314 |
} |
315 |
|
316 |
/* Tells the socket to use transport layer security (SSL). liboi does not
|
317 |
* want to make any decisions about security requirements, so the
|
318 |
* majoirty of GnuTLS configuration is left to the user. Only the transport
|
319 |
* layer of GnuTLS is controlled by liboi.
|
320 |
*
|
321 |
* That is, do not use gnutls_transport_* functions.
|
322 |
* Do use the rest of GnuTLS's API.
|
323 |
*/
|
324 |
void
|
325 |
oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session) |
326 |
{ |
327 |
socket->session = session; |
328 |
socket->secure = TRUE; |
329 |
} |
330 |
#endif /* HAVE GNUTLS */ |
331 |
|
332 |
static int |
333 |
socket_send(oi_socket *socket) |
334 |
{ |
335 |
ssize_t sent; |
336 |
|
337 |
assert(socket->secure == FALSE); |
338 |
|
339 |
if(oi_queue_empty(&socket->out_stream)) {
|
340 |
ev_io_stop(socket->loop, &socket->write_watcher); |
341 |
return AGAIN;
|
342 |
} |
343 |
|
344 |
oi_queue *q = oi_queue_last(&socket->out_stream); |
345 |
oi_buf *to_write = oi_queue_data(q, oi_buf, queue); |
346 |
|
347 |
int flags = 0; |
348 |
#ifdef MSG_NOSIGNAL
|
349 |
flags |= MSG_NOSIGNAL; |
350 |
#endif
|
351 |
#ifdef MSG_DONTWAIT
|
352 |
flags |= MSG_DONTWAIT; |
353 |
#endif
|
354 |
|
355 |
/* TODO use writev() here */
|
356 |
|
357 |
sent = send( socket->fd |
358 |
, to_write->base + to_write->written |
359 |
, to_write->len - to_write->written |
360 |
, flags |
361 |
); |
362 |
|
363 |
if(sent < 0) { |
364 |
switch(errno) {
|
365 |
case EAGAIN:
|
366 |
return AGAIN;
|
367 |
|
368 |
case ECONNREFUSED:
|
369 |
case ECONNRESET:
|
370 |
socket->write_action = NULL;
|
371 |
/* TODO maybe just clear write buffer instead of error?
|
372 |
* They should be able to read still from the socket.
|
373 |
*/
|
374 |
RAISE_ERROR(socket, OI_ERROR_SEND, errno); |
375 |
return ERROR;
|
376 |
|
377 |
default:
|
378 |
perror("send()");
|
379 |
assert(0 && "oi shouldn't let this happen."); |
380 |
return ERROR;
|
381 |
} |
382 |
} |
383 |
|
384 |
oi_socket_reset_timeout(socket); |
385 |
|
386 |
if(!socket->connected) {
|
387 |
socket->connected = TRUE; |
388 |
if(socket->on_connect) { socket->on_connect(socket); }
|
389 |
} |
390 |
|
391 |
update_write_buffer_after_send(socket, sent); |
392 |
|
393 |
return OKAY;
|
394 |
} |
395 |
|
396 |
static int |
397 |
socket_recv(oi_socket *socket) |
398 |
{ |
399 |
char buf[TCP_MAXWIN];
|
400 |
size_t buf_size = TCP_MAXWIN; |
401 |
ssize_t recved; |
402 |
|
403 |
assert(socket->secure == FALSE); |
404 |
|
405 |
if(!socket->connected) {
|
406 |
socket->connected = TRUE; |
407 |
if(socket->on_connect) { socket->on_connect(socket); }
|
408 |
return OKAY;
|
409 |
} |
410 |
|
411 |
recved = recv(socket->fd, buf, buf_size, 0);
|
412 |
|
413 |
if(recved < 0) { |
414 |
switch(errno) {
|
415 |
case EAGAIN:
|
416 |
case EINTR:
|
417 |
return AGAIN;
|
418 |
|
419 |
/* A remote host refused to allow the network connection (typically
|
420 |
* because it is not running the requested service). */
|
421 |
case ECONNREFUSED:
|
422 |
RAISE_ERROR(socket, OI_ERROR_RECV, errno); |
423 |
return ERROR;
|
424 |
|
425 |
case ECONNRESET:
|
426 |
RAISE_ERROR(socket, OI_ERROR_RECV, errno); |
427 |
return ERROR;
|
428 |
|
429 |
default:
|
430 |
perror("recv()");
|
431 |
printf("unmatched errno %d\n", errno);
|
432 |
assert(0 && "recv returned error that oi should have caught before."); |
433 |
return ERROR;
|
434 |
} |
435 |
} |
436 |
|
437 |
oi_socket_reset_timeout(socket); |
438 |
|
439 |
if(recved == 0) { |
440 |
oi_socket_read_stop(socket); |
441 |
socket->read_action = NULL;
|
442 |
} |
443 |
|
444 |
/* NOTE: EOF is signaled with recved == 0 on callback */
|
445 |
if(socket->on_read) { socket->on_read(socket, buf, recved); }
|
446 |
|
447 |
return OKAY;
|
448 |
} |
449 |
|
450 |
static void |
451 |
assign_file_descriptor(oi_socket *socket, int fd)
|
452 |
{ |
453 |
socket->fd = fd; |
454 |
|
455 |
ev_io_set (&socket->read_watcher, fd, EV_READ); |
456 |
ev_io_set (&socket->write_watcher, fd, EV_WRITE); |
457 |
|
458 |
socket->read_action = socket_recv; |
459 |
socket->write_action = socket_send; |
460 |
|
461 |
#if HAVE_GNUTLS
|
462 |
if(socket->secure) {
|
463 |
gnutls_transport_set_lowat(socket->session, 0);
|
464 |
gnutls_transport_set_push_function(socket->session, nosigpipe_push); |
465 |
gnutls_transport_set_ptr2 ( socket->session |
466 |
/* recv */ , (gnutls_transport_ptr_t)fd
|
467 |
/* send */ , socket
|
468 |
); |
469 |
socket->read_action = secure_handshake; |
470 |
socket->write_action = secure_handshake; |
471 |
} |
472 |
#endif
|
473 |
} |
474 |
|
475 |
|
476 |
/* Internal callback
|
477 |
* Called by server->connection_watcher.
|
478 |
*/
|
479 |
static void |
480 |
on_connection(struct ev_loop *loop, ev_io *watcher, int revents) |
481 |
{ |
482 |
oi_server *server = watcher->data; |
483 |
|
484 |
// printf("on connection!\n");
|
485 |
|
486 |
assert(server->listening); |
487 |
assert(server->loop == loop); |
488 |
assert(&server->connection_watcher == watcher); |
489 |
|
490 |
if(EV_ERROR & revents) {
|
491 |
oi_server_close(server); |
492 |
return;
|
493 |
} |
494 |
|
495 |
struct sockaddr address; /* connector's address information */ |
496 |
socklen_t addr_len = sizeof(address);
|
497 |
|
498 |
/* TODO accept all possible connections? currently: just one */
|
499 |
int fd = accept(server->fd, (struct sockaddr*)&address, &addr_len); |
500 |
if(fd < 0) { |
501 |
perror("accept()");
|
502 |
return;
|
503 |
} |
504 |
|
505 |
oi_socket *socket = NULL;
|
506 |
if(server->on_connection)
|
507 |
socket = server->on_connection(server, (struct sockaddr*)&address, addr_len);
|
508 |
|
509 |
if(socket == NULL) { |
510 |
close(fd); |
511 |
return;
|
512 |
} |
513 |
|
514 |
int flags = fcntl(fd, F_GETFL, 0); |
515 |
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
516 |
if(r < 0) { |
517 |
/* TODO error report */
|
518 |
} |
519 |
|
520 |
#ifdef SO_NOSIGPIPE
|
521 |
flags = 1;
|
522 |
setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
|
523 |
#endif
|
524 |
|
525 |
socket->server = server; |
526 |
assign_file_descriptor(socket, fd); |
527 |
oi_socket_attach(socket, loop); |
528 |
} |
529 |
|
530 |
int
|
531 |
oi_server_listen(oi_server *server, struct addrinfo *addrinfo)
|
532 |
{ |
533 |
int fd = -1; |
534 |
struct linger ling = {0, 0}; |
535 |
assert(server->listening == FALSE); |
536 |
|
537 |
fd = socket( addrinfo->ai_family |
538 |
, addrinfo->ai_socktype |
539 |
, addrinfo->ai_protocol |
540 |
); |
541 |
if(fd < 0) { |
542 |
perror("socket()");
|
543 |
return -1; |
544 |
} |
545 |
|
546 |
int flags = fcntl(fd, F_GETFL, 0); |
547 |
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
548 |
if(r < 0) { |
549 |
perror("fcntl()");
|
550 |
return -1; |
551 |
} |
552 |
|
553 |
flags = 1;
|
554 |
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); |
555 |
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); |
556 |
setsockopt(fd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); |
557 |
|
558 |
/* XXX: Sending single byte chunks in a response body? Perhaps there is a
|
559 |
* need to enable the Nagel algorithm dynamically. For now disabling.
|
560 |
*/
|
561 |
//setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
|
562 |
|
563 |
if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) < 0) { |
564 |
perror("bind()");
|
565 |
close(fd); |
566 |
return -1; |
567 |
} |
568 |
|
569 |
if (listen(fd, server->backlog) < 0) { |
570 |
perror("listen()");
|
571 |
close(fd); |
572 |
return -1; |
573 |
} |
574 |
|
575 |
server->fd = fd; |
576 |
server->listening = TRUE; |
577 |
ev_io_set (&server->connection_watcher, server->fd, EV_READ); |
578 |
|
579 |
return 0; |
580 |
} |
581 |
|
582 |
/**
|
583 |
* Stops the server. Will not accept new connections. Does not drop
|
584 |
* existing connections.
|
585 |
*/
|
586 |
void
|
587 |
oi_server_close(oi_server *server) |
588 |
{ |
589 |
if(server->listening) {
|
590 |
oi_server_detach(server); |
591 |
close(server->fd); |
592 |
/* TODO do this on the loop? check return value? */
|
593 |
server->listening = FALSE; |
594 |
} |
595 |
} |
596 |
|
597 |
void
|
598 |
oi_server_attach (oi_server *server, struct ev_loop *loop)
|
599 |
{ |
600 |
ev_io_start (loop, &server->connection_watcher); |
601 |
server->loop = loop; |
602 |
} |
603 |
|
604 |
void
|
605 |
oi_server_detach (oi_server *server) |
606 |
{ |
607 |
ev_io_stop (server->loop, &server->connection_watcher); |
608 |
server->loop = NULL;
|
609 |
} |
610 |
|
611 |
void
|
612 |
oi_server_init(oi_server *server, int backlog)
|
613 |
{ |
614 |
server->backlog = backlog; |
615 |
server->listening = FALSE; |
616 |
server->fd = -1;
|
617 |
server->connection_watcher.data = server; |
618 |
ev_init (&server->connection_watcher, on_connection); |
619 |
|
620 |
server->on_connection = NULL;
|
621 |
server->on_error = NULL;
|
622 |
server->data = NULL;
|
623 |
} |
624 |
|
625 |
/* Internal callback. called by socket->timeout_watcher */
|
626 |
static void |
627 |
on_timeout(struct ev_loop *loop, ev_timer *watcher, int revents) |
628 |
{ |
629 |
oi_socket *socket = watcher->data; |
630 |
|
631 |
assert(watcher == &socket->timeout_watcher); |
632 |
|
633 |
// printf("on_timeout\n");
|
634 |
|
635 |
if(socket->on_timeout) { socket->on_timeout(socket); }
|
636 |
|
637 |
|
638 |
/* TODD set timer to zero */
|
639 |
full_close(socket); |
640 |
} |
641 |
|
642 |
static void |
643 |
release_write_buffer(oi_socket *socket) |
644 |
{ |
645 |
while(!oi_queue_empty(&socket->out_stream)) {
|
646 |
oi_queue *q = oi_queue_last(&socket->out_stream); |
647 |
oi_buf *buf = oi_queue_data(q, oi_buf, queue); |
648 |
oi_queue_remove(q); |
649 |
if(buf->release) { buf->release(buf); }
|
650 |
} |
651 |
} |
652 |
|
653 |
/* Internal callback. called by socket->read_watcher */
|
654 |
static void |
655 |
on_io_event(struct ev_loop *loop, ev_io *watcher, int revents) |
656 |
{ |
657 |
oi_socket *socket = watcher->data; |
658 |
|
659 |
if(revents & EV_ERROR) {
|
660 |
RAISE_ERROR(socket, OI_ERROR_EV, 0);
|
661 |
goto close;
|
662 |
} |
663 |
|
664 |
int r;
|
665 |
int have_read_event = TRUE;
|
666 |
int have_write_event = TRUE;
|
667 |
|
668 |
while(have_read_event || have_write_event) {
|
669 |
|
670 |
if(socket->read_action) {
|
671 |
r = socket->read_action(socket); |
672 |
if(r == ERROR) goto close; |
673 |
if(r == AGAIN) have_read_event = FALSE;
|
674 |
} else {
|
675 |
have_read_event = FALSE; |
676 |
} |
677 |
|
678 |
if(socket->write_action) {
|
679 |
r = socket->write_action(socket); |
680 |
if(r == ERROR) goto close; |
681 |
if(r == AGAIN) have_write_event = FALSE;
|
682 |
} else {
|
683 |
have_write_event = FALSE; |
684 |
} |
685 |
|
686 |
|
687 |
if(socket->read_watcher.active == FALSE)
|
688 |
have_read_event = FALSE; |
689 |
if(socket->write_watcher.active == FALSE)
|
690 |
have_write_event = FALSE; |
691 |
} |
692 |
|
693 |
if(socket->write_action == NULL && socket->read_action == NULL) |
694 |
goto close;
|
695 |
|
696 |
return;
|
697 |
|
698 |
close:
|
699 |
release_write_buffer(socket); |
700 |
|
701 |
ev_clear_pending (socket->loop, &socket->write_watcher); |
702 |
ev_clear_pending (socket->loop, &socket->read_watcher); |
703 |
ev_clear_pending (socket->loop, &socket->timeout_watcher); |
704 |
|
705 |
oi_socket_detach(socket); |
706 |
|
707 |
if(socket->on_close) { socket->on_close(socket); }
|
708 |
/* WARNING: user can free socket in on_close so no more
|
709 |
* access beyond this point. */
|
710 |
} |
711 |
|
712 |
/**
|
713 |
* If using SSL do consider setting
|
714 |
* gnutls_db_set_retrieve_function (socket->session, _);
|
715 |
* gnutls_db_set_remove_function (socket->session, _);
|
716 |
* gnutls_db_set_store_function (socket->session, _);
|
717 |
* gnutls_db_set_ptr (socket->session, _);
|
718 |
*/
|
719 |
void
|
720 |
oi_socket_init(oi_socket *socket, float timeout)
|
721 |
{ |
722 |
socket->fd = -1;
|
723 |
socket->server = NULL;
|
724 |
socket->loop = NULL;
|
725 |
socket->connected = FALSE; |
726 |
|
727 |
oi_queue_init(&socket->out_stream); |
728 |
|
729 |
ev_init (&socket->write_watcher, on_io_event); |
730 |
socket->write_watcher.data = socket; |
731 |
|
732 |
ev_init(&socket->read_watcher, on_io_event); |
733 |
socket->read_watcher.data = socket; |
734 |
|
735 |
socket->secure = FALSE; |
736 |
socket->wait_for_secure_hangup = FALSE; |
737 |
#if HAVE_GNUTLS
|
738 |
socket->session = NULL;
|
739 |
#endif
|
740 |
|
741 |
/* TODO higher resolution timer */
|
742 |
ev_timer_init(&socket->timeout_watcher, on_timeout, 0., timeout);
|
743 |
socket->timeout_watcher.data = socket; |
744 |
|
745 |
socket->read_action = NULL;
|
746 |
socket->write_action = NULL;
|
747 |
|
748 |
socket->chunksize = TCP_MAXWIN; |
749 |
socket->on_connect = NULL;
|
750 |
socket->on_read = NULL;
|
751 |
socket->on_drain = NULL;
|
752 |
socket->on_error = NULL;
|
753 |
socket->on_timeout = NULL;
|
754 |
} |
755 |
|
756 |
void
|
757 |
oi_socket_write_eof (oi_socket *socket) |
758 |
{ |
759 |
#if HAVE_GNUTLS
|
760 |
/* try to hang up properly for secure connections */
|
761 |
if(socket->secure)
|
762 |
{ |
763 |
if( socket->connected /* completed handshake */ |
764 |
&& socket->write_action /* write end is open */
|
765 |
) |
766 |
{ |
767 |
socket->write_action = secure_half_goodbye; |
768 |
if(socket->loop)
|
769 |
ev_io_start(socket->loop, &socket->write_watcher); |
770 |
return;
|
771 |
} |
772 |
/* secure servers cannot handle half-closed connections? */
|
773 |
full_close(socket); |
774 |
return;
|
775 |
} |
776 |
#endif // HAVE_GNUTLS |
777 |
|
778 |
if(socket->write_action)
|
779 |
half_close(socket); |
780 |
else
|
781 |
full_close(socket); |
782 |
} |
783 |
|
784 |
void
|
785 |
oi_socket_close (oi_socket *socket) |
786 |
{ |
787 |
#if HAVE_GNUTLS
|
788 |
/* try to hang up properly for secure connections */
|
789 |
if( socket->secure
|
790 |
&& socket->connected /* completed handshake */
|
791 |
&& socket->write_action /* write end is open */
|
792 |
) |
793 |
{ |
794 |
if(socket->wait_for_secure_hangup && socket->read_action) {
|
795 |
socket->write_action = secure_full_goodbye; |
796 |
socket->read_action = secure_full_goodbye; |
797 |
} else {
|
798 |
socket->write_action = secure_half_goodbye; |
799 |
socket->read_action = NULL;
|
800 |
} |
801 |
|
802 |
if(socket->loop)
|
803 |
ev_io_start(socket->loop, &socket->write_watcher); |
804 |
|
805 |
return;
|
806 |
} |
807 |
#endif // HAVE_GNUTLS |
808 |
|
809 |
full_close(socket); |
810 |
} |
811 |
|
812 |
/*
|
813 |
* Resets the timeout to stay alive for another socket->timeout seconds
|
814 |
*/
|
815 |
void
|
816 |
oi_socket_reset_timeout(oi_socket *socket) |
817 |
{ |
818 |
ev_timer_again(socket->loop, &socket->timeout_watcher); |
819 |
} |
820 |
|
821 |
/**
|
822 |
* Writes a string to the socket. This is actually sets a watcher which may
|
823 |
* take multiple iterations to write the entire string.
|
824 |
*/
|
825 |
void
|
826 |
oi_socket_write(oi_socket *socket, oi_buf *buf) |
827 |
{ |
828 |
if(socket->write_action == NULL) |
829 |
return;
|
830 |
|
831 |
oi_queue_insert_head(&socket->out_stream, &buf->queue); |
832 |
|
833 |
buf->written = 0;
|
834 |
ev_io_start(socket->loop, &socket->write_watcher); |
835 |
} |
836 |
|
837 |
static void |
838 |
free_simple_buf ( oi_buf *buf ) |
839 |
{ |
840 |
free(buf->base); |
841 |
free(buf); |
842 |
} |
843 |
|
844 |
/* Writes a string to the socket.
|
845 |
* NOTE: Allocates memory. Avoid for performance applications.
|
846 |
*/
|
847 |
void
|
848 |
oi_socket_write_simple(oi_socket *socket, const char *str, size_t len) |
849 |
{ |
850 |
oi_buf *buf = malloc(sizeof(oi_buf));
|
851 |
buf->release = free_simple_buf; |
852 |
buf->base = strdup(str); |
853 |
buf->len = len; |
854 |
|
855 |
oi_socket_write(socket, buf); |
856 |
} |
857 |
|
858 |
void
|
859 |
oi_socket_attach(oi_socket *socket, struct ev_loop *loop)
|
860 |
{ |
861 |
socket->loop = loop; |
862 |
|
863 |
ev_timer_again(loop, &socket->timeout_watcher); |
864 |
|
865 |
if(socket->read_action)
|
866 |
ev_io_start(loop, &socket->read_watcher); |
867 |
|
868 |
if(socket->write_action)
|
869 |
ev_io_start(loop, &socket->write_watcher); |
870 |
|
871 |
/* make sure the io_event happens soon in the case we're being reattached */
|
872 |
ev_feed_event(loop, &socket->read_watcher, EV_READ); |
873 |
} |
874 |
|
875 |
void
|
876 |
oi_socket_detach(oi_socket *socket) |
877 |
{ |
878 |
if(socket->loop) {
|
879 |
ev_io_stop(socket->loop, &socket->write_watcher); |
880 |
ev_io_stop(socket->loop, &socket->read_watcher); |
881 |
ev_timer_stop(socket->loop, &socket->timeout_watcher); |
882 |
socket->loop = NULL;
|
883 |
} |
884 |
} |
885 |
|
886 |
void
|
887 |
oi_socket_read_stop (oi_socket *socket) |
888 |
{ |
889 |
ev_io_stop(socket->loop, &socket->read_watcher); |
890 |
ev_clear_pending (socket->loop, &socket->read_watcher); |
891 |
} |
892 |
|
893 |
void
|
894 |
oi_socket_read_start (oi_socket *socket) |
895 |
{ |
896 |
if(socket->read_action) {
|
897 |
ev_io_start(socket->loop, &socket->read_watcher); |
898 |
/* XXX feed event? */
|
899 |
} |
900 |
} |
901 |
|
902 |
int
|
903 |
oi_socket_connect(oi_socket *s, struct addrinfo *addrinfo)
|
904 |
{ |
905 |
int fd = socket( addrinfo->ai_family
|
906 |
, addrinfo->ai_socktype |
907 |
, addrinfo->ai_protocol |
908 |
); |
909 |
if(fd < 0) { |
910 |
perror("socket()");
|
911 |
return -1; |
912 |
} |
913 |
|
914 |
int flags = fcntl(fd, F_GETFL, 0); |
915 |
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
916 |
if(r < 0) { |
917 |
perror("fcntl()");
|
918 |
return -1; |
919 |
} |
920 |
|
921 |
#ifdef SO_NOSIGPIPE
|
922 |
flags = 1;
|
923 |
setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
|
924 |
#endif
|
925 |
|
926 |
r = connect( fd |
927 |
, addrinfo->ai_addr |
928 |
, addrinfo->ai_addrlen |
929 |
); |
930 |
|
931 |
if(r < 0 && errno != EINPROGRESS) { |
932 |
perror("connect");
|
933 |
close(fd); |
934 |
return -1; |
935 |
} |
936 |
|
937 |
assign_file_descriptor(s, fd); |
938 |
|
939 |
return 0; |
940 |
} |
941 |
|