The data contained in this repository can be downloaded to your computer using one of several clients.
Please see the documentation of your version control software client for more information.

Please select the desired protocol below to get the URL.

This URL has Read-Only access.

Statistics
| Branch: | Revision:

main_repo / deps / liboi / oi_socket.c @ 90fc8d36

History | View | Annotate | Download (21.6 KB)

1 40c0f755 Ryan
#include <stdio.h>
2
#include <stdlib.h>
3
#include <assert.h>
4
#include <unistd.h> /* close() */
5
#include <fcntl.h>  /* fcntl() */
6
#include <errno.h> /* for the default methods */
7
#include <string.h> /* memset */
8
9
#include <netinet/tcp.h> /* TCP_NODELAY */
10
11
#include <ev.h>
12
#include <oi_socket.h>
13
14 90fc8d36 Ryan
#if EV_MULTIPLICITY
15
# define SOCKET_LOOP_ socket->loop, 
16
# define SERVER_LOOP_ server->loop, 
17
#else
18
# define SOCKET_LOOP_ 
19
# define SERVER_LOOP_
20
#endif // EV_MULTIPLICITY
21
22 40c0f755 Ryan
#if HAVE_GNUTLS
23
# include <gnutls/gnutls.h>
24
# define GNUTLS_NEED_WRITE (gnutls_record_get_direction(socket->session) == 1)
25
# define GNUTLS_NEED_READ (gnutls_record_get_direction(socket->session) == 0)
26
#endif
27
28
#undef TRUE
29
#define TRUE 1
30
#undef FALSE
31
#define FALSE 0
32
#undef MIN
33
#define MIN(a,b) ((a) < (b) ? (a) : (b))
34
35
#define OKAY  0
36
#define AGAIN 1
37
#define ERROR 2 
38
39
#define RAISE_ERROR(s, _domain, _code) do { \
40
  if(s->on_error) { \
41
    struct oi_error __oi_error; \
42
    __oi_error.domain = _domain; \
43
    __oi_error.code = _code; \
44
    s->on_error(s, __oi_error); \
45
  } \
46
} while(0) \
47
48
static int 
49
full_close(oi_socket *socket)
50
{
51
  if(-1 == close(socket->fd) && errno == EINTR) {
52
    /* TODO fd still open. next loop call close again? */
53
    assert(0 && "implement me");  
54
    return ERROR;
55
  }
56
57
  socket->read_action = NULL;
58
  socket->write_action = NULL;
59
60 90fc8d36 Ryan
  if(socket->attached) {
61
    ev_feed_event(SOCKET_LOOP_ &socket->read_watcher, EV_READ);
62 40c0f755 Ryan
  }
63
  return OKAY;
64
}
65
66
static int 
67
half_close(oi_socket *socket)
68
{
69
  int r = shutdown(socket->fd, SHUT_WR);
70
71
  if(r == -1) {
72
    RAISE_ERROR(socket, OI_ERROR_SHUTDOWN, errno);
73
    return ERROR;
74
  }
75
76
  socket->write_action = NULL;
77
78
  /* TODO set timer to zero  so we get a callback soon */
79
  return OKAY;
80
}
81
82
static void
83
update_write_buffer_after_send(oi_socket *socket, ssize_t sent)
84
{
85
  oi_queue *q = oi_queue_last(&socket->out_stream);
86
  oi_buf *to_write = oi_queue_data(q, oi_buf, queue);
87
  to_write->written += sent;
88
  socket->written += sent;
89
90
  if(to_write->written == to_write->len) {
91
92
    oi_queue_remove(q);
93
94
    if(to_write->release) {
95
      to_write->release(to_write);
96
    }  
97
98
    if(oi_queue_empty(&socket->out_stream)) {
99 90fc8d36 Ryan
      ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
100 40c0f755 Ryan
      if(socket->on_drain)
101
        socket->on_drain(socket);
102
    }
103
  }
104
}
105
106
107
#if HAVE_GNUTLS
108
static int secure_socket_send(oi_socket *socket);
109
static int secure_socket_recv(oi_socket *socket);
110
111
/* TODO can this be done without ignoring SIGPIPE?  */
112
static ssize_t 
113
nosigpipe_push(gnutls_transport_ptr_t data, const void *buf, size_t len)
114
{
115
  oi_socket *socket = (oi_socket*)data;
116
  assert(socket->secure);
117
  int flags = 0;
118
#ifdef MSG_NOSIGNAL
119
  flags |= MSG_NOSIGNAL;
120
#endif
121
#ifdef MSG_DONTWAIT
122
  flags |= MSG_DONTWAIT;
123
#endif
124
  int r = send(socket->fd, buf, len, flags);
125
126
  if(r == -1) {
127
    gnutls_transport_set_errno(socket->session, errno); /* necessary ? */
128
  }
129
130
  return r;
131
}
132
133
static int
134
secure_handshake(oi_socket *socket)
135
{
136
  assert(socket->secure);
137
138
  int r = gnutls_handshake(socket->session);
139
140
  if(gnutls_error_is_fatal(r)) {
141
    RAISE_ERROR(socket, OI_ERROR_GNUTLS, r);
142
    return ERROR;
143
  }
144
145
  if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
146
    return AGAIN;
147
148
  oi_socket_reset_timeout(socket);
149
150
  if(!socket->connected) {
151
    socket->connected = TRUE;
152
    if(socket->on_connect)
153
      socket->on_connect(socket);
154
  }
155
156
  if(socket->read_action)
157
    socket->read_action = secure_socket_recv;
158
 
159
  if(socket->write_action)
160
    socket->write_action = secure_socket_send;
161
162
  return OKAY;
163
}
164
165
static int
166
secure_socket_send(oi_socket *socket)
167
{
168
  ssize_t sent;
169
170
  if(oi_queue_empty(&socket->out_stream)) {
171 90fc8d36 Ryan
    ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
172 40c0f755 Ryan
    return AGAIN;
173
  }
174
175
  oi_queue *q = oi_queue_last(&socket->out_stream);
176
  oi_buf *to_write = oi_queue_data(q, oi_buf, queue);
177
178
  assert(socket->secure);
179
180
  sent = gnutls_record_send( socket->session
181
                           , to_write->base + to_write->written
182
                           , to_write->len - to_write->written
183
                           ); 
184
185
  if(gnutls_error_is_fatal(sent)) {
186
    RAISE_ERROR(socket, OI_ERROR_GNUTLS, sent);
187
    return ERROR;
188
  }
189
190
  if(sent == 0)
191
    return AGAIN;
192
193
  oi_socket_reset_timeout(socket);
194
195
  if(sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) {
196
    if(GNUTLS_NEED_READ) {
197
      if(socket->read_action) {
198
        socket->read_action = secure_socket_send;
199
      } else {
200
        /* TODO GnuTLS needs read but already got EOF */
201
        assert(0 && "needs read but already got EOF");
202
        return ERROR;
203
      }
204
    }
205
    return AGAIN;
206
  }
207
208
  if(sent > 0) {
209
    /* make sure the callbacks are correct */
210
    if(socket->read_action)
211
      socket->read_action = secure_socket_recv;
212
    update_write_buffer_after_send(socket, sent);
213
    return OKAY;
214
  }
215
216
  assert(0 && "Unhandled return code from gnutls_record_send()!");
217
  return ERROR;
218
}
219
220
static int
221
secure_socket_recv(oi_socket *socket)
222
{
223
  char recv_buffer[TCP_MAXWIN];
224
  size_t recv_buffer_size = MIN(TCP_MAXWIN, socket->chunksize);
225
  ssize_t recved;
226
227
  assert(socket->secure);
228
229
  recved = gnutls_record_recv(socket->session, recv_buffer, recv_buffer_size);
230
231
  //printf("secure socket recv %d %p\n", recved, socket->on_connect);
232
233
  if(gnutls_error_is_fatal(recved)) {
234
    RAISE_ERROR(socket, OI_ERROR_GNUTLS, recved);
235
    return ERROR;
236
  }
237
238
  if(recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN)  {
239
    if(GNUTLS_NEED_WRITE) {
240
      if(socket->write_action) {
241
        printf("need write\n");
242
        socket->write_action = secure_socket_recv;
243
      } else {
244
        /* TODO GnuTLS needs send but already closed write end */
245
        assert(0 && "needs read but cannot");
246
        return ERROR;
247
      }
248
    }
249
    return AGAIN;
250
  }
251
252
  oi_socket_reset_timeout(socket);
253
254
  /* A server may also receive GNUTLS_E_REHANDSHAKE when a client has
255
   * initiated a handshake. In that case the server can only initiate a
256
   * handshake or terminate the connection. */
257
  if(recved == GNUTLS_E_REHANDSHAKE) {
258
    if(socket->write_action) {
259
      socket->read_action = secure_handshake;
260
      socket->write_action = secure_handshake;
261
      return OKAY;
262
    } else {
263
      /* TODO */
264
      assert(0 && "needs read but cannot");
265
      return ERROR;
266
    }
267
  }
268
269
  if(recved >= 0) {
270
    /* Got EOF */
271
    if(recved == 0)
272
      socket->read_action = NULL;
273
274
    if(socket->write_action) 
275
      socket->write_action = secure_socket_send;
276
277
    if(socket->on_read) { socket->on_read(socket, recv_buffer, recved); }
278
279
    return OKAY;
280
  }
281
282
  assert(0 && "Unhandled return code from gnutls_record_send()!");
283
  return ERROR;
284
}
285
286
static int
287
secure_goodbye(oi_socket *socket, gnutls_close_request_t how)
288
{
289
  assert(socket->secure);
290
291
  int r = gnutls_bye(socket->session, how);
292
293
  if(gnutls_error_is_fatal(r))  {
294
    RAISE_ERROR(socket, OI_ERROR_GNUTLS, r);
295
    return ERROR;
296
  }
297
298
  if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
299
    return AGAIN;
300
301
  return OKAY;
302
}
303
304
static int
305
secure_full_goodbye(oi_socket *socket)
306
{
307
  int r = secure_goodbye(socket, GNUTLS_SHUT_RDWR);
308
  if(OKAY == r) {
309
    return full_close(socket);
310
  }
311
  return r;
312
}
313
314
static int
315
secure_half_goodbye(oi_socket *socket)
316
{
317
  int r = secure_goodbye(socket, GNUTLS_SHUT_WR);
318
  if(OKAY == r) {
319
    return half_close(socket);
320
  }
321
  return r;
322
}
323
324
/* Tells the socket to use transport layer security (SSL). liboi does not
325
 * want to make any decisions about security requirements, so the
326
 * majoirty of GnuTLS configuration is left to the user. Only the transport
327
 * layer of GnuTLS is controlled by liboi.
328
 *
329
 * That is, do not use gnutls_transport_* functions. 
330
 * Do use the rest of GnuTLS's API.
331
 */
332
void
333
oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session)
334
{
335
  socket->session = session;
336
  socket->secure = TRUE;
337
}
338
#endif /* HAVE GNUTLS */
339
340
static int
341
socket_send(oi_socket *socket)
342
{
343
  ssize_t sent;
344
345
  assert(socket->secure == FALSE);
346
347
  if(oi_queue_empty(&socket->out_stream)) {
348 90fc8d36 Ryan
    ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
349 40c0f755 Ryan
    return AGAIN;
350
  }
351
352
  oi_queue *q = oi_queue_last(&socket->out_stream);
353
  oi_buf *to_write = oi_queue_data(q, oi_buf, queue);
354
  
355
  int flags = 0;
356
#ifdef MSG_NOSIGNAL
357
  flags |= MSG_NOSIGNAL;
358
#endif
359
#ifdef MSG_DONTWAIT
360
  flags |= MSG_DONTWAIT;
361
#endif
362
363
  /* TODO use writev() here */
364
365
  sent = send( socket->fd
366
             , to_write->base + to_write->written
367
             , to_write->len - to_write->written
368
             , flags
369
             );
370
371
  if(sent < 0) {
372
    switch(errno) {
373
      case EAGAIN:
374
        return AGAIN;
375
376
      case ECONNREFUSED:
377
      case ECONNRESET:
378
        socket->write_action = NULL;
379
        /* TODO maybe just clear write buffer instead of error? 
380
         * They should be able to read still from the socket. 
381
         */
382
        RAISE_ERROR(socket, OI_ERROR_SEND, errno);
383
        return ERROR;
384
385
      default:
386
        perror("send()");
387
        assert(0 && "oi shouldn't let this happen.");
388
        return ERROR;
389
    }
390
  }
391
392
  oi_socket_reset_timeout(socket);
393
394
  if(!socket->connected) {
395
    socket->connected = TRUE;
396
    if(socket->on_connect) { socket->on_connect(socket); }
397
  }
398
399
  update_write_buffer_after_send(socket, sent);
400
401
  return OKAY;
402
}
403
404
static int
405
socket_recv(oi_socket *socket)
406
{
407
  char buf[TCP_MAXWIN];
408
  size_t buf_size = TCP_MAXWIN;
409
  ssize_t recved;
410
411
  assert(socket->secure == FALSE);
412
413
  if(!socket->connected) {
414
    socket->connected = TRUE;
415
    if(socket->on_connect) { socket->on_connect(socket); }
416
    return OKAY;
417
  }
418
419
  recved = recv(socket->fd, buf, buf_size, 0);
420
421
  if(recved < 0) {
422
    switch(errno) {
423
      case EAGAIN: 
424
      case EINTR:  
425
        return AGAIN;
426
427
      /* A remote host refused to allow the network connection (typically
428
       * because it is not running the requested service). */
429
      case ECONNREFUSED:
430
        RAISE_ERROR(socket, OI_ERROR_RECV, errno);
431
        return ERROR; 
432
433
      case ECONNRESET:
434
        RAISE_ERROR(socket, OI_ERROR_RECV, errno);
435
        return ERROR; 
436
437
      default:
438
        perror("recv()");
439
        printf("unmatched errno %d\n", errno);
440
        assert(0 && "recv returned error that oi should have caught before.");
441
        return ERROR;
442
    }
443
  }
444
445
  oi_socket_reset_timeout(socket);
446
447
  if(recved == 0) {
448
    oi_socket_read_stop(socket);
449
    socket->read_action = NULL;
450
  }
451
452
  /* NOTE: EOF is signaled with recved == 0 on callback */
453
  if(socket->on_read) { socket->on_read(socket, buf, recved); }
454
455
  return OKAY;
456
}
457
458
static void
459
assign_file_descriptor(oi_socket *socket, int fd)
460
{
461
  socket->fd = fd;
462
463
  ev_io_set (&socket->read_watcher, fd, EV_READ);
464
  ev_io_set (&socket->write_watcher, fd, EV_WRITE);
465
466
  socket->read_action = socket_recv;
467
  socket->write_action = socket_send;
468
469
#if HAVE_GNUTLS
470
  if(socket->secure) {
471
    gnutls_transport_set_lowat(socket->session, 0); 
472
    gnutls_transport_set_push_function(socket->session, nosigpipe_push);
473
    gnutls_transport_set_ptr2 ( socket->session
474
                 /* recv */   , (gnutls_transport_ptr_t)fd 
475
                 /* send */   , socket 
476
                              );
477
    socket->read_action = secure_handshake;
478
    socket->write_action = secure_handshake;
479
  }
480
#endif 
481
}
482
483
484
/* Internal callback 
485
 * Called by server->connection_watcher.
486
 */
487
static void 
488 90fc8d36 Ryan
on_connection(EV_P_ ev_io *watcher, int revents)
489 40c0f755 Ryan
{
490
  oi_server *server = watcher->data;
491
492
 // printf("on connection!\n");
493
494
  assert(server->listening);
495 90fc8d36 Ryan
#if EV_MULTIPLICITY
496 40c0f755 Ryan
  assert(server->loop == loop);
497 90fc8d36 Ryan
#endif 
498 40c0f755 Ryan
  assert(&server->connection_watcher == watcher);
499
  
500
  if(EV_ERROR & revents) {
501
    oi_server_close(server);
502
    return;
503
  }
504
  
505
  struct sockaddr address; /* connector's address information */
506
  socklen_t addr_len = sizeof(address);
507
  
508
  /* TODO accept all possible connections? currently: just one */
509
  int fd = accept(server->fd, (struct sockaddr*)&address, &addr_len);
510
  if(fd < 0) {
511
    perror("accept()");
512
    return;
513
  }
514
515
  oi_socket *socket = NULL;
516
  if(server->on_connection)
517
    socket = server->on_connection(server, (struct sockaddr*)&address, addr_len);
518
519
  if(socket == NULL) {
520
    close(fd);
521
    return;
522
  } 
523
  
524
  int flags = fcntl(fd, F_GETFL, 0);
525
  int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
526
  if(r < 0) {
527
    /* TODO error report */
528
  }
529
  
530
#ifdef SO_NOSIGPIPE
531
  flags = 1;
532
  setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
533
#endif
534
535
  socket->server = server;
536
  assign_file_descriptor(socket, fd);
537 90fc8d36 Ryan
  oi_socket_attach(EV_A_ socket);
538 40c0f755 Ryan
}
539
540
int
541
oi_server_listen(oi_server *server, struct addrinfo *addrinfo)
542
{
543
  int fd = -1;
544
  struct linger ling = {0, 0};
545
  assert(server->listening == FALSE);
546
547
  fd = socket( addrinfo->ai_family
548
             , addrinfo->ai_socktype
549
             , addrinfo->ai_protocol
550
             );
551
  if(fd < 0) {
552
    perror("socket()");
553
    return -1;
554
  }
555
556
  int flags = fcntl(fd, F_GETFL, 0);
557
  int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
558
  if(r < 0) {
559
    perror("fcntl()");
560
    return -1;
561
  }
562
563
  flags = 1;
564
  setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
565
  setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
566
  setsockopt(fd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
567
568
  /* XXX: Sending single byte chunks in a response body? Perhaps there is a
569
   * need to enable the Nagel algorithm dynamically. For now disabling.
570
   */
571
  //setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
572
573
  if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) < 0) {
574
    perror("bind()");
575
    close(fd);
576
    return -1;
577
  }
578
  
579
  if (listen(fd, server->backlog) < 0) {
580
    perror("listen()");
581
    close(fd);
582
    return -1;
583
  }
584
  
585
  server->fd = fd;
586
  server->listening = TRUE;
587
  ev_io_set (&server->connection_watcher, server->fd, EV_READ);
588
  
589
  return 0;
590
}
591
592
/**
593
 * Stops the server. Will not accept new connections.  Does not drop
594
 * existing connections.
595
 */
596
void 
597
oi_server_close(oi_server *server)
598
{
599
  if(server->listening) {
600
    oi_server_detach(server);
601
    close(server->fd);
602
    /* TODO do this on the loop? check return value? */
603
    server->listening = FALSE;
604
  }
605
}
606
607
void
608 90fc8d36 Ryan
oi_server_attach (EV_P_ oi_server *server)
609 40c0f755 Ryan
{
610 90fc8d36 Ryan
  ev_io_start (EV_A_ &server->connection_watcher);
611
#if EV_MULTIPLICITY
612 40c0f755 Ryan
  server->loop = loop;
613 90fc8d36 Ryan
#endif
614
  server->attached = TRUE;
615 40c0f755 Ryan
}
616
617
void
618
oi_server_detach (oi_server *server)
619
{
620 90fc8d36 Ryan
  ev_io_stop (SERVER_LOOP_ &server->connection_watcher);
621
#if EV_MULTIPLICITY
622 40c0f755 Ryan
  server->loop = NULL;
623 90fc8d36 Ryan
#endif
624
  server->attached = FALSE;
625 40c0f755 Ryan
}
626
627
void 
628
oi_server_init(oi_server *server, int backlog)
629
{
630
  server->backlog = backlog;
631 90fc8d36 Ryan
  server->attached = FALSE;
632 40c0f755 Ryan
  server->listening = FALSE;
633
  server->fd = -1;
634
  server->connection_watcher.data = server;
635
  ev_init (&server->connection_watcher, on_connection);
636
637
  server->on_connection = NULL;
638
  server->on_error = NULL;
639
  server->data = NULL;
640
}
641
642
/* Internal callback. called by socket->timeout_watcher */
643
static void 
644 90fc8d36 Ryan
on_timeout(EV_P_ ev_timer *watcher, int revents)
645 40c0f755 Ryan
{
646
  oi_socket *socket = watcher->data;
647
648
  assert(watcher == &socket->timeout_watcher);
649
650
 // printf("on_timeout\n");
651
652
  if(socket->on_timeout) { socket->on_timeout(socket); }
653
654
655
  /* TODD set timer to zero */
656
  full_close(socket);
657
}
658
659
static void
660
release_write_buffer(oi_socket *socket)
661
{
662
  while(!oi_queue_empty(&socket->out_stream)) {
663
    oi_queue *q = oi_queue_last(&socket->out_stream);
664
    oi_buf *buf = oi_queue_data(q, oi_buf, queue);
665
    oi_queue_remove(q);
666
    if(buf->release) { buf->release(buf); }
667
  }
668
}
669
670
/* Internal callback. called by socket->read_watcher */
671
static void 
672 90fc8d36 Ryan
on_io_event(EV_P_ ev_io *watcher, int revents)
673 40c0f755 Ryan
{
674
  oi_socket *socket = watcher->data;
675
676
  if(revents & EV_ERROR) {
677
    RAISE_ERROR(socket, OI_ERROR_EV, 0);
678
    goto close;
679
  }
680
681
  int r;
682
  int have_read_event = TRUE;
683
  int have_write_event = TRUE;
684
685
  while(have_read_event || have_write_event) {
686
687
    if(socket->read_action) {
688
      r = socket->read_action(socket);
689
      if(r == ERROR) goto close;
690
      if(r == AGAIN) have_read_event = FALSE;
691
    } else {
692
      have_read_event = FALSE;
693
    }
694
695
    if(socket->write_action) {
696
      r = socket->write_action(socket);
697
      if(r == ERROR) goto close;
698
      if(r == AGAIN) have_write_event = FALSE;
699
    } else {
700
      have_write_event = FALSE;
701
    }
702
703
704
    if(socket->read_watcher.active == FALSE)
705
      have_read_event = FALSE;
706
    if(socket->write_watcher.active == FALSE)
707
      have_write_event = FALSE;
708
  }
709
710
  if(socket->write_action == NULL && socket->read_action == NULL)
711
    goto close;
712
713
  return;
714
715
close:
716
  release_write_buffer(socket);
717
718 90fc8d36 Ryan
  ev_clear_pending (EV_A_ &socket->write_watcher);
719
  ev_clear_pending (EV_A_ &socket->read_watcher);
720
  ev_clear_pending (EV_A_ &socket->timeout_watcher);
721 40c0f755 Ryan
722
  oi_socket_detach(socket);
723
724
  if(socket->on_close) { socket->on_close(socket); }
725
  /* WARNING: user can free socket in on_close so no more 
726
   * access beyond this point. */
727
}
728
729
/**
730
 * If using SSL do consider setting
731
 *   gnutls_db_set_retrieve_function (socket->session, _);
732
 *   gnutls_db_set_remove_function (socket->session, _);
733
 *   gnutls_db_set_store_function (socket->session, _);
734
 *   gnutls_db_set_ptr (socket->session, _);
735
 */
736
void 
737
oi_socket_init(oi_socket *socket, float timeout)
738
{
739
  socket->fd = -1;
740
  socket->server = NULL;
741 90fc8d36 Ryan
#if EV_MULTIPLICITY
742 40c0f755 Ryan
  socket->loop = NULL;
743 90fc8d36 Ryan
#endif
744
  socket->attached = FALSE;
745 40c0f755 Ryan
  socket->connected = FALSE;
746
747
  oi_queue_init(&socket->out_stream);
748
749
  ev_init (&socket->write_watcher, on_io_event);
750
  socket->write_watcher.data = socket;
751
752
  ev_init(&socket->read_watcher, on_io_event);
753
  socket->read_watcher.data = socket;
754
755
  socket->secure = FALSE;
756
  socket->wait_for_secure_hangup = FALSE;
757
#if HAVE_GNUTLS
758
  socket->session = NULL;
759
#endif 
760
761
  /* TODO higher resolution timer */
762
  ev_timer_init(&socket->timeout_watcher, on_timeout, 0., timeout);
763
  socket->timeout_watcher.data = socket;  
764
765
  socket->read_action = NULL;
766
  socket->write_action = NULL;
767
768
  socket->chunksize = TCP_MAXWIN; 
769
  socket->on_connect = NULL;
770
  socket->on_read = NULL;
771
  socket->on_drain = NULL;
772
  socket->on_error = NULL;
773
  socket->on_timeout = NULL;
774
}
775
776
void 
777
oi_socket_write_eof (oi_socket *socket)
778
{
779
#if HAVE_GNUTLS
780
  /* try to hang up properly for secure connections */
781
  if(socket->secure) 
782
  {
783
    if( socket->connected /* completed handshake */ 
784
     && socket->write_action /* write end is open */
785
      ) 
786
    {
787
      socket->write_action = secure_half_goodbye;
788 90fc8d36 Ryan
      if(socket->attached)
789
        ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
790 40c0f755 Ryan
      return;
791
    }
792
    /* secure servers cannot handle half-closed connections? */
793
    full_close(socket); 
794
    return;
795
  }
796
#endif // HAVE_GNUTLS
797
798
  if(socket->write_action)
799
    half_close(socket);
800
  else
801
    full_close(socket);
802
}
803
804
void 
805
oi_socket_close (oi_socket *socket)
806
{
807
#if HAVE_GNUTLS
808
  /* try to hang up properly for secure connections */
809
  if( socket->secure 
810
   && socket->connected /* completed handshake */ 
811
   && socket->write_action /* write end is open */
812
    ) 
813
  {
814
    if(socket->wait_for_secure_hangup && socket->read_action) {
815
      socket->write_action = secure_full_goodbye;
816
      socket->read_action = secure_full_goodbye;
817
    } else {
818
      socket->write_action = secure_half_goodbye;
819
      socket->read_action = NULL;
820
    }
821
822 90fc8d36 Ryan
    if(socket->attached)
823
      ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
824 40c0f755 Ryan
825
    return;
826
  }
827
#endif // HAVE_GNUTLS
828
829
  full_close(socket);
830
}
831
832
/* 
833
 * Resets the timeout to stay alive for another socket->timeout seconds
834
 */
835
void 
836
oi_socket_reset_timeout(oi_socket *socket)
837
{
838 90fc8d36 Ryan
  ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher);
839 40c0f755 Ryan
}
840
841
/**
842
 * Writes a string to the socket. This is actually sets a watcher which may
843
 * take multiple iterations to write the entire string.
844
 */
845
void 
846
oi_socket_write(oi_socket *socket, oi_buf *buf)
847
{
848
  if(socket->write_action == NULL)
849
    return;
850
851
  oi_queue_insert_head(&socket->out_stream, &buf->queue);
852
853
  buf->written = 0;
854 90fc8d36 Ryan
  // XXX if (socket->attached)   ??
855
  ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
856 40c0f755 Ryan
}
857
858
static void
859
free_simple_buf ( oi_buf *buf )
860
{
861
  free(buf->base);
862
  free(buf);
863
}
864
865
/* Writes a string to the socket. 
866
 * NOTE: Allocates memory. Avoid for performance applications.
867
 */ 
868
void
869
oi_socket_write_simple(oi_socket *socket, const char *str, size_t len)
870
{
871
  oi_buf *buf = malloc(sizeof(oi_buf));
872
  buf->release = free_simple_buf;
873
  buf->base = strdup(str);
874
  buf->len = len;
875
876
  oi_socket_write(socket, buf);
877
}
878
879
void
880 90fc8d36 Ryan
oi_socket_attach(EV_P_ oi_socket *socket)
881 40c0f755 Ryan
{
882 90fc8d36 Ryan
#if EV_MULTIPLICITY
883 40c0f755 Ryan
  socket->loop = loop;
884 90fc8d36 Ryan
#endif 
885
  socket->attached = TRUE;
886 40c0f755 Ryan
887 90fc8d36 Ryan
  ev_timer_again(EV_A_ &socket->timeout_watcher);
888 40c0f755 Ryan
889
  if(socket->read_action) 
890 90fc8d36 Ryan
    ev_io_start(EV_A_ &socket->read_watcher);
891 40c0f755 Ryan
892
  if(socket->write_action) 
893 90fc8d36 Ryan
    ev_io_start(EV_A_ &socket->write_watcher);
894 40c0f755 Ryan
895
  /* make sure the io_event happens soon  in the case we're being reattached */
896 90fc8d36 Ryan
  ev_feed_event(EV_A_ &socket->read_watcher, EV_READ);
897 40c0f755 Ryan
}
898
899
void
900
oi_socket_detach(oi_socket *socket)
901
{
902 90fc8d36 Ryan
  if(socket->attached) {
903
    ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
904
    ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
905
    ev_timer_stop(SOCKET_LOOP_ &socket->timeout_watcher);
906
#if EV_MULTIPLICITY
907 40c0f755 Ryan
    socket->loop = NULL;
908 90fc8d36 Ryan
#endif
909
    socket->attached = FALSE;
910 40c0f755 Ryan
  }
911
}
912
913
void
914
oi_socket_read_stop (oi_socket *socket)
915
{
916 90fc8d36 Ryan
  ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
917
  ev_clear_pending (SOCKET_LOOP_ &socket->read_watcher);
918 40c0f755 Ryan
}
919
920
void
921
oi_socket_read_start (oi_socket *socket)
922
{
923
  if(socket->read_action) {
924 90fc8d36 Ryan
    ev_io_start(SOCKET_LOOP_ &socket->read_watcher);
925 40c0f755 Ryan
    /* XXX feed event? */
926
  }
927
}
928
929
int
930
oi_socket_connect(oi_socket *s, struct addrinfo *addrinfo)
931
{
932
  int fd = socket( addrinfo->ai_family
933
                 , addrinfo->ai_socktype
934
                 , addrinfo->ai_protocol
935
                 );
936
  if(fd < 0) {
937
    perror("socket()");
938
    return -1;
939
  }
940
941
  int flags = fcntl(fd, F_GETFL, 0);
942
  int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
943
  if(r < 0) {
944
    perror("fcntl()");
945
    return -1;
946
  }
947
      
948
#ifdef SO_NOSIGPIPE
949
  flags = 1;
950
  setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
951
#endif
952
953
  r = connect( fd
954
             , addrinfo->ai_addr
955
             , addrinfo->ai_addrlen
956
             );
957
958
  if(r < 0 && errno != EINPROGRESS) {
959
    perror("connect");
960
    close(fd);
961
    return -1;
962
  }
963
964
  assign_file_descriptor(s, fd);
965
966
  return 0;
967
}