The data contained in this repository can be downloaded to your computer using one of several clients.
Please see the documentation of your version control software client for more information.

Please select the desired protocol below to get the URL.

This URL has Read-Only access.

Statistics
| Branch: | Revision:

main_repo / deps / liboi / oi_socket.c @ 90fc8d36

History | View | Annotate | Download (21.6 KB)

1
#include <stdio.h>
2
#include <stdlib.h>
3
#include <assert.h>
4
#include <unistd.h> /* close() */
5
#include <fcntl.h>  /* fcntl() */
6
#include <errno.h> /* for the default methods */
7
#include <string.h> /* memset */
8

    
9
#include <netinet/tcp.h> /* TCP_NODELAY */
10

    
11
#include <ev.h>
12
#include <oi_socket.h>
13

    
14
#if EV_MULTIPLICITY
15
# define SOCKET_LOOP_ socket->loop, 
16
# define SERVER_LOOP_ server->loop, 
17
#else
18
# define SOCKET_LOOP_ 
19
# define SERVER_LOOP_
20
#endif // EV_MULTIPLICITY
21

    
22
#if HAVE_GNUTLS
23
# include <gnutls/gnutls.h>
24
# define GNUTLS_NEED_WRITE (gnutls_record_get_direction(socket->session) == 1)
25
# define GNUTLS_NEED_READ (gnutls_record_get_direction(socket->session) == 0)
26
#endif
27

    
28
#undef TRUE
29
#define TRUE 1
30
#undef FALSE
31
#define FALSE 0
32
#undef MIN
33
#define MIN(a,b) ((a) < (b) ? (a) : (b))
34

    
35
#define OKAY  0
36
#define AGAIN 1
37
#define ERROR 2 
38

    
39
#define RAISE_ERROR(s, _domain, _code) do { \
40
  if(s->on_error) { \
41
    struct oi_error __oi_error; \
42
    __oi_error.domain = _domain; \
43
    __oi_error.code = _code; \
44
    s->on_error(s, __oi_error); \
45
  } \
46
} while(0) \
47

    
48
static int 
49
full_close(oi_socket *socket)
50
{
51
  if(-1 == close(socket->fd) && errno == EINTR) {
52
    /* TODO fd still open. next loop call close again? */
53
    assert(0 && "implement me");  
54
    return ERROR;
55
  }
56

    
57
  socket->read_action = NULL;
58
  socket->write_action = NULL;
59

    
60
  if(socket->attached) {
61
    ev_feed_event(SOCKET_LOOP_ &socket->read_watcher, EV_READ);
62
  }
63
  return OKAY;
64
}
65

    
66
static int 
67
half_close(oi_socket *socket)
68
{
69
  int r = shutdown(socket->fd, SHUT_WR);
70

    
71
  if(r == -1) {
72
    RAISE_ERROR(socket, OI_ERROR_SHUTDOWN, errno);
73
    return ERROR;
74
  }
75

    
76
  socket->write_action = NULL;
77

    
78
  /* TODO set timer to zero  so we get a callback soon */
79
  return OKAY;
80
}
81

    
82
static void
83
update_write_buffer_after_send(oi_socket *socket, ssize_t sent)
84
{
85
  oi_queue *q = oi_queue_last(&socket->out_stream);
86
  oi_buf *to_write = oi_queue_data(q, oi_buf, queue);
87
  to_write->written += sent;
88
  socket->written += sent;
89

    
90
  if(to_write->written == to_write->len) {
91

    
92
    oi_queue_remove(q);
93

    
94
    if(to_write->release) {
95
      to_write->release(to_write);
96
    }  
97

    
98
    if(oi_queue_empty(&socket->out_stream)) {
99
      ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
100
      if(socket->on_drain)
101
        socket->on_drain(socket);
102
    }
103
  }
104
}
105

    
106

    
107
#if HAVE_GNUTLS
108
static int secure_socket_send(oi_socket *socket);
109
static int secure_socket_recv(oi_socket *socket);
110

    
111
/* TODO can this be done without ignoring SIGPIPE?  */
112
static ssize_t 
113
nosigpipe_push(gnutls_transport_ptr_t data, const void *buf, size_t len)
114
{
115
  oi_socket *socket = (oi_socket*)data;
116
  assert(socket->secure);
117
  int flags = 0;
118
#ifdef MSG_NOSIGNAL
119
  flags |= MSG_NOSIGNAL;
120
#endif
121
#ifdef MSG_DONTWAIT
122
  flags |= MSG_DONTWAIT;
123
#endif
124
  int r = send(socket->fd, buf, len, flags);
125

    
126
  if(r == -1) {
127
    gnutls_transport_set_errno(socket->session, errno); /* necessary ? */
128
  }
129

    
130
  return r;
131
}
132

    
133
static int
134
secure_handshake(oi_socket *socket)
135
{
136
  assert(socket->secure);
137

    
138
  int r = gnutls_handshake(socket->session);
139

    
140
  if(gnutls_error_is_fatal(r)) {
141
    RAISE_ERROR(socket, OI_ERROR_GNUTLS, r);
142
    return ERROR;
143
  }
144

    
145
  if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
146
    return AGAIN;
147

    
148
  oi_socket_reset_timeout(socket);
149

    
150
  if(!socket->connected) {
151
    socket->connected = TRUE;
152
    if(socket->on_connect)
153
      socket->on_connect(socket);
154
  }
155

    
156
  if(socket->read_action)
157
    socket->read_action = secure_socket_recv;
158
 
159
  if(socket->write_action)
160
    socket->write_action = secure_socket_send;
161

    
162
  return OKAY;
163
}
164

    
165
static int
166
secure_socket_send(oi_socket *socket)
167
{
168
  ssize_t sent;
169

    
170
  if(oi_queue_empty(&socket->out_stream)) {
171
    ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
172
    return AGAIN;
173
  }
174

    
175
  oi_queue *q = oi_queue_last(&socket->out_stream);
176
  oi_buf *to_write = oi_queue_data(q, oi_buf, queue);
177

    
178
  assert(socket->secure);
179

    
180
  sent = gnutls_record_send( socket->session
181
                           , to_write->base + to_write->written
182
                           , to_write->len - to_write->written
183
                           ); 
184

    
185
  if(gnutls_error_is_fatal(sent)) {
186
    RAISE_ERROR(socket, OI_ERROR_GNUTLS, sent);
187
    return ERROR;
188
  }
189

    
190
  if(sent == 0)
191
    return AGAIN;
192

    
193
  oi_socket_reset_timeout(socket);
194

    
195
  if(sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) {
196
    if(GNUTLS_NEED_READ) {
197
      if(socket->read_action) {
198
        socket->read_action = secure_socket_send;
199
      } else {
200
        /* TODO GnuTLS needs read but already got EOF */
201
        assert(0 && "needs read but already got EOF");
202
        return ERROR;
203
      }
204
    }
205
    return AGAIN;
206
  }
207

    
208
  if(sent > 0) {
209
    /* make sure the callbacks are correct */
210
    if(socket->read_action)
211
      socket->read_action = secure_socket_recv;
212
    update_write_buffer_after_send(socket, sent);
213
    return OKAY;
214
  }
215

    
216
  assert(0 && "Unhandled return code from gnutls_record_send()!");
217
  return ERROR;
218
}
219

    
220
static int
221
secure_socket_recv(oi_socket *socket)
222
{
223
  char recv_buffer[TCP_MAXWIN];
224
  size_t recv_buffer_size = MIN(TCP_MAXWIN, socket->chunksize);
225
  ssize_t recved;
226

    
227
  assert(socket->secure);
228

    
229
  recved = gnutls_record_recv(socket->session, recv_buffer, recv_buffer_size);
230

    
231
  //printf("secure socket recv %d %p\n", recved, socket->on_connect);
232

    
233
  if(gnutls_error_is_fatal(recved)) {
234
    RAISE_ERROR(socket, OI_ERROR_GNUTLS, recved);
235
    return ERROR;
236
  }
237

    
238
  if(recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN)  {
239
    if(GNUTLS_NEED_WRITE) {
240
      if(socket->write_action) {
241
        printf("need write\n");
242
        socket->write_action = secure_socket_recv;
243
      } else {
244
        /* TODO GnuTLS needs send but already closed write end */
245
        assert(0 && "needs read but cannot");
246
        return ERROR;
247
      }
248
    }
249
    return AGAIN;
250
  }
251

    
252
  oi_socket_reset_timeout(socket);
253

    
254
  /* A server may also receive GNUTLS_E_REHANDSHAKE when a client has
255
   * initiated a handshake. In that case the server can only initiate a
256
   * handshake or terminate the connection. */
257
  if(recved == GNUTLS_E_REHANDSHAKE) {
258
    if(socket->write_action) {
259
      socket->read_action = secure_handshake;
260
      socket->write_action = secure_handshake;
261
      return OKAY;
262
    } else {
263
      /* TODO */
264
      assert(0 && "needs read but cannot");
265
      return ERROR;
266
    }
267
  }
268

    
269
  if(recved >= 0) {
270
    /* Got EOF */
271
    if(recved == 0)
272
      socket->read_action = NULL;
273

    
274
    if(socket->write_action) 
275
      socket->write_action = secure_socket_send;
276

    
277
    if(socket->on_read) { socket->on_read(socket, recv_buffer, recved); }
278

    
279
    return OKAY;
280
  }
281

    
282
  assert(0 && "Unhandled return code from gnutls_record_send()!");
283
  return ERROR;
284
}
285

    
286
static int
287
secure_goodbye(oi_socket *socket, gnutls_close_request_t how)
288
{
289
  assert(socket->secure);
290

    
291
  int r = gnutls_bye(socket->session, how);
292

    
293
  if(gnutls_error_is_fatal(r))  {
294
    RAISE_ERROR(socket, OI_ERROR_GNUTLS, r);
295
    return ERROR;
296
  }
297

    
298
  if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
299
    return AGAIN;
300

    
301
  return OKAY;
302
}
303

    
304
static int
305
secure_full_goodbye(oi_socket *socket)
306
{
307
  int r = secure_goodbye(socket, GNUTLS_SHUT_RDWR);
308
  if(OKAY == r) {
309
    return full_close(socket);
310
  }
311
  return r;
312
}
313

    
314
static int
315
secure_half_goodbye(oi_socket *socket)
316
{
317
  int r = secure_goodbye(socket, GNUTLS_SHUT_WR);
318
  if(OKAY == r) {
319
    return half_close(socket);
320
  }
321
  return r;
322
}
323

    
324
/* Tells the socket to use transport layer security (SSL). liboi does not
325
 * want to make any decisions about security requirements, so the
326
 * majoirty of GnuTLS configuration is left to the user. Only the transport
327
 * layer of GnuTLS is controlled by liboi.
328
 *
329
 * That is, do not use gnutls_transport_* functions. 
330
 * Do use the rest of GnuTLS's API.
331
 */
332
void
333
oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session)
334
{
335
  socket->session = session;
336
  socket->secure = TRUE;
337
}
338
#endif /* HAVE GNUTLS */
339

    
340
static int
341
socket_send(oi_socket *socket)
342
{
343
  ssize_t sent;
344

    
345
  assert(socket->secure == FALSE);
346

    
347
  if(oi_queue_empty(&socket->out_stream)) {
348
    ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
349
    return AGAIN;
350
  }
351

    
352
  oi_queue *q = oi_queue_last(&socket->out_stream);
353
  oi_buf *to_write = oi_queue_data(q, oi_buf, queue);
354
  
355
  int flags = 0;
356
#ifdef MSG_NOSIGNAL
357
  flags |= MSG_NOSIGNAL;
358
#endif
359
#ifdef MSG_DONTWAIT
360
  flags |= MSG_DONTWAIT;
361
#endif
362

    
363
  /* TODO use writev() here */
364

    
365
  sent = send( socket->fd
366
             , to_write->base + to_write->written
367
             , to_write->len - to_write->written
368
             , flags
369
             );
370

    
371
  if(sent < 0) {
372
    switch(errno) {
373
      case EAGAIN:
374
        return AGAIN;
375

    
376
      case ECONNREFUSED:
377
      case ECONNRESET:
378
        socket->write_action = NULL;
379
        /* TODO maybe just clear write buffer instead of error? 
380
         * They should be able to read still from the socket. 
381
         */
382
        RAISE_ERROR(socket, OI_ERROR_SEND, errno);
383
        return ERROR;
384

    
385
      default:
386
        perror("send()");
387
        assert(0 && "oi shouldn't let this happen.");
388
        return ERROR;
389
    }
390
  }
391

    
392
  oi_socket_reset_timeout(socket);
393

    
394
  if(!socket->connected) {
395
    socket->connected = TRUE;
396
    if(socket->on_connect) { socket->on_connect(socket); }
397
  }
398

    
399
  update_write_buffer_after_send(socket, sent);
400

    
401
  return OKAY;
402
}
403

    
404
static int
405
socket_recv(oi_socket *socket)
406
{
407
  char buf[TCP_MAXWIN];
408
  size_t buf_size = TCP_MAXWIN;
409
  ssize_t recved;
410

    
411
  assert(socket->secure == FALSE);
412

    
413
  if(!socket->connected) {
414
    socket->connected = TRUE;
415
    if(socket->on_connect) { socket->on_connect(socket); }
416
    return OKAY;
417
  }
418

    
419
  recved = recv(socket->fd, buf, buf_size, 0);
420

    
421
  if(recved < 0) {
422
    switch(errno) {
423
      case EAGAIN: 
424
      case EINTR:  
425
        return AGAIN;
426

    
427
      /* A remote host refused to allow the network connection (typically
428
       * because it is not running the requested service). */
429
      case ECONNREFUSED:
430
        RAISE_ERROR(socket, OI_ERROR_RECV, errno);
431
        return ERROR; 
432

    
433
      case ECONNRESET:
434
        RAISE_ERROR(socket, OI_ERROR_RECV, errno);
435
        return ERROR; 
436

    
437
      default:
438
        perror("recv()");
439
        printf("unmatched errno %d\n", errno);
440
        assert(0 && "recv returned error that oi should have caught before.");
441
        return ERROR;
442
    }
443
  }
444

    
445
  oi_socket_reset_timeout(socket);
446

    
447
  if(recved == 0) {
448
    oi_socket_read_stop(socket);
449
    socket->read_action = NULL;
450
  }
451

    
452
  /* NOTE: EOF is signaled with recved == 0 on callback */
453
  if(socket->on_read) { socket->on_read(socket, buf, recved); }
454

    
455
  return OKAY;
456
}
457

    
458
static void
459
assign_file_descriptor(oi_socket *socket, int fd)
460
{
461
  socket->fd = fd;
462

    
463
  ev_io_set (&socket->read_watcher, fd, EV_READ);
464
  ev_io_set (&socket->write_watcher, fd, EV_WRITE);
465

    
466
  socket->read_action = socket_recv;
467
  socket->write_action = socket_send;
468

    
469
#if HAVE_GNUTLS
470
  if(socket->secure) {
471
    gnutls_transport_set_lowat(socket->session, 0); 
472
    gnutls_transport_set_push_function(socket->session, nosigpipe_push);
473
    gnutls_transport_set_ptr2 ( socket->session
474
                 /* recv */   , (gnutls_transport_ptr_t)fd 
475
                 /* send */   , socket 
476
                              );
477
    socket->read_action = secure_handshake;
478
    socket->write_action = secure_handshake;
479
  }
480
#endif 
481
}
482

    
483

    
484
/* Internal callback 
485
 * Called by server->connection_watcher.
486
 */
487
static void 
488
on_connection(EV_P_ ev_io *watcher, int revents)
489
{
490
  oi_server *server = watcher->data;
491

    
492
 // printf("on connection!\n");
493

    
494
  assert(server->listening);
495
#if EV_MULTIPLICITY
496
  assert(server->loop == loop);
497
#endif 
498
  assert(&server->connection_watcher == watcher);
499
  
500
  if(EV_ERROR & revents) {
501
    oi_server_close(server);
502
    return;
503
  }
504
  
505
  struct sockaddr address; /* connector's address information */
506
  socklen_t addr_len = sizeof(address);
507
  
508
  /* TODO accept all possible connections? currently: just one */
509
  int fd = accept(server->fd, (struct sockaddr*)&address, &addr_len);
510
  if(fd < 0) {
511
    perror("accept()");
512
    return;
513
  }
514

    
515
  oi_socket *socket = NULL;
516
  if(server->on_connection)
517
    socket = server->on_connection(server, (struct sockaddr*)&address, addr_len);
518

    
519
  if(socket == NULL) {
520
    close(fd);
521
    return;
522
  } 
523
  
524
  int flags = fcntl(fd, F_GETFL, 0);
525
  int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
526
  if(r < 0) {
527
    /* TODO error report */
528
  }
529
  
530
#ifdef SO_NOSIGPIPE
531
  flags = 1;
532
  setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
533
#endif
534

    
535
  socket->server = server;
536
  assign_file_descriptor(socket, fd);
537
  oi_socket_attach(EV_A_ socket);
538
}
539

    
540
int
541
oi_server_listen(oi_server *server, struct addrinfo *addrinfo)
542
{
543
  int fd = -1;
544
  struct linger ling = {0, 0};
545
  assert(server->listening == FALSE);
546

    
547
  fd = socket( addrinfo->ai_family
548
             , addrinfo->ai_socktype
549
             , addrinfo->ai_protocol
550
             );
551
  if(fd < 0) {
552
    perror("socket()");
553
    return -1;
554
  }
555

    
556
  int flags = fcntl(fd, F_GETFL, 0);
557
  int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
558
  if(r < 0) {
559
    perror("fcntl()");
560
    return -1;
561
  }
562

    
563
  flags = 1;
564
  setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
565
  setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
566
  setsockopt(fd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
567

    
568
  /* XXX: Sending single byte chunks in a response body? Perhaps there is a
569
   * need to enable the Nagel algorithm dynamically. For now disabling.
570
   */
571
  //setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
572

    
573
  if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) < 0) {
574
    perror("bind()");
575
    close(fd);
576
    return -1;
577
  }
578
  
579
  if (listen(fd, server->backlog) < 0) {
580
    perror("listen()");
581
    close(fd);
582
    return -1;
583
  }
584
  
585
  server->fd = fd;
586
  server->listening = TRUE;
587
  ev_io_set (&server->connection_watcher, server->fd, EV_READ);
588
  
589
  return 0;
590
}
591

    
592
/**
593
 * Stops the server. Will not accept new connections.  Does not drop
594
 * existing connections.
595
 */
596
void 
597
oi_server_close(oi_server *server)
598
{
599
  if(server->listening) {
600
    oi_server_detach(server);
601
    close(server->fd);
602
    /* TODO do this on the loop? check return value? */
603
    server->listening = FALSE;
604
  }
605
}
606

    
607
void
608
oi_server_attach (EV_P_ oi_server *server)
609
{
610
  ev_io_start (EV_A_ &server->connection_watcher);
611
#if EV_MULTIPLICITY
612
  server->loop = loop;
613
#endif
614
  server->attached = TRUE;
615
}
616

    
617
void
618
oi_server_detach (oi_server *server)
619
{
620
  ev_io_stop (SERVER_LOOP_ &server->connection_watcher);
621
#if EV_MULTIPLICITY
622
  server->loop = NULL;
623
#endif
624
  server->attached = FALSE;
625
}
626

    
627
void 
628
oi_server_init(oi_server *server, int backlog)
629
{
630
  server->backlog = backlog;
631
  server->attached = FALSE;
632
  server->listening = FALSE;
633
  server->fd = -1;
634
  server->connection_watcher.data = server;
635
  ev_init (&server->connection_watcher, on_connection);
636

    
637
  server->on_connection = NULL;
638
  server->on_error = NULL;
639
  server->data = NULL;
640
}
641

    
642
/* Internal callback. called by socket->timeout_watcher */
643
static void 
644
on_timeout(EV_P_ ev_timer *watcher, int revents)
645
{
646
  oi_socket *socket = watcher->data;
647

    
648
  assert(watcher == &socket->timeout_watcher);
649

    
650
 // printf("on_timeout\n");
651

    
652
  if(socket->on_timeout) { socket->on_timeout(socket); }
653

    
654

    
655
  /* TODD set timer to zero */
656
  full_close(socket);
657
}
658

    
659
static void
660
release_write_buffer(oi_socket *socket)
661
{
662
  while(!oi_queue_empty(&socket->out_stream)) {
663
    oi_queue *q = oi_queue_last(&socket->out_stream);
664
    oi_buf *buf = oi_queue_data(q, oi_buf, queue);
665
    oi_queue_remove(q);
666
    if(buf->release) { buf->release(buf); }
667
  }
668
}
669

    
670
/* Internal callback. called by socket->read_watcher */
671
static void 
672
on_io_event(EV_P_ ev_io *watcher, int revents)
673
{
674
  oi_socket *socket = watcher->data;
675

    
676
  if(revents & EV_ERROR) {
677
    RAISE_ERROR(socket, OI_ERROR_EV, 0);
678
    goto close;
679
  }
680

    
681
  int r;
682
  int have_read_event = TRUE;
683
  int have_write_event = TRUE;
684

    
685
  while(have_read_event || have_write_event) {
686

    
687
    if(socket->read_action) {
688
      r = socket->read_action(socket);
689
      if(r == ERROR) goto close;
690
      if(r == AGAIN) have_read_event = FALSE;
691
    } else {
692
      have_read_event = FALSE;
693
    }
694

    
695
    if(socket->write_action) {
696
      r = socket->write_action(socket);
697
      if(r == ERROR) goto close;
698
      if(r == AGAIN) have_write_event = FALSE;
699
    } else {
700
      have_write_event = FALSE;
701
    }
702

    
703

    
704
    if(socket->read_watcher.active == FALSE)
705
      have_read_event = FALSE;
706
    if(socket->write_watcher.active == FALSE)
707
      have_write_event = FALSE;
708
  }
709

    
710
  if(socket->write_action == NULL && socket->read_action == NULL)
711
    goto close;
712

    
713
  return;
714

    
715
close:
716
  release_write_buffer(socket);
717

    
718
  ev_clear_pending (EV_A_ &socket->write_watcher);
719
  ev_clear_pending (EV_A_ &socket->read_watcher);
720
  ev_clear_pending (EV_A_ &socket->timeout_watcher);
721

    
722
  oi_socket_detach(socket);
723

    
724
  if(socket->on_close) { socket->on_close(socket); }
725
  /* WARNING: user can free socket in on_close so no more 
726
   * access beyond this point. */
727
}
728

    
729
/**
730
 * If using SSL do consider setting
731
 *   gnutls_db_set_retrieve_function (socket->session, _);
732
 *   gnutls_db_set_remove_function (socket->session, _);
733
 *   gnutls_db_set_store_function (socket->session, _);
734
 *   gnutls_db_set_ptr (socket->session, _);
735
 */
736
void 
737
oi_socket_init(oi_socket *socket, float timeout)
738
{
739
  socket->fd = -1;
740
  socket->server = NULL;
741
#if EV_MULTIPLICITY
742
  socket->loop = NULL;
743
#endif
744
  socket->attached = FALSE;
745
  socket->connected = FALSE;
746

    
747
  oi_queue_init(&socket->out_stream);
748

    
749
  ev_init (&socket->write_watcher, on_io_event);
750
  socket->write_watcher.data = socket;
751

    
752
  ev_init(&socket->read_watcher, on_io_event);
753
  socket->read_watcher.data = socket;
754

    
755
  socket->secure = FALSE;
756
  socket->wait_for_secure_hangup = FALSE;
757
#if HAVE_GNUTLS
758
  socket->session = NULL;
759
#endif 
760

    
761
  /* TODO higher resolution timer */
762
  ev_timer_init(&socket->timeout_watcher, on_timeout, 0., timeout);
763
  socket->timeout_watcher.data = socket;  
764

    
765
  socket->read_action = NULL;
766
  socket->write_action = NULL;
767

    
768
  socket->chunksize = TCP_MAXWIN; 
769
  socket->on_connect = NULL;
770
  socket->on_read = NULL;
771
  socket->on_drain = NULL;
772
  socket->on_error = NULL;
773
  socket->on_timeout = NULL;
774
}
775

    
776
void 
777
oi_socket_write_eof (oi_socket *socket)
778
{
779
#if HAVE_GNUTLS
780
  /* try to hang up properly for secure connections */
781
  if(socket->secure) 
782
  {
783
    if( socket->connected /* completed handshake */ 
784
     && socket->write_action /* write end is open */
785
      ) 
786
    {
787
      socket->write_action = secure_half_goodbye;
788
      if(socket->attached)
789
        ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
790
      return;
791
    }
792
    /* secure servers cannot handle half-closed connections? */
793
    full_close(socket); 
794
    return;
795
  }
796
#endif // HAVE_GNUTLS
797

    
798
  if(socket->write_action)
799
    half_close(socket);
800
  else
801
    full_close(socket);
802
}
803

    
804
void 
805
oi_socket_close (oi_socket *socket)
806
{
807
#if HAVE_GNUTLS
808
  /* try to hang up properly for secure connections */
809
  if( socket->secure 
810
   && socket->connected /* completed handshake */ 
811
   && socket->write_action /* write end is open */
812
    ) 
813
  {
814
    if(socket->wait_for_secure_hangup && socket->read_action) {
815
      socket->write_action = secure_full_goodbye;
816
      socket->read_action = secure_full_goodbye;
817
    } else {
818
      socket->write_action = secure_half_goodbye;
819
      socket->read_action = NULL;
820
    }
821

    
822
    if(socket->attached)
823
      ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
824

    
825
    return;
826
  }
827
#endif // HAVE_GNUTLS
828

    
829
  full_close(socket);
830
}
831

    
832
/* 
833
 * Resets the timeout to stay alive for another socket->timeout seconds
834
 */
835
void 
836
oi_socket_reset_timeout(oi_socket *socket)
837
{
838
  ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher);
839
}
840

    
841
/**
842
 * Writes a string to the socket. This is actually sets a watcher which may
843
 * take multiple iterations to write the entire string.
844
 */
845
void 
846
oi_socket_write(oi_socket *socket, oi_buf *buf)
847
{
848
  if(socket->write_action == NULL)
849
    return;
850

    
851
  oi_queue_insert_head(&socket->out_stream, &buf->queue);
852

    
853
  buf->written = 0;
854
  // XXX if (socket->attached)   ??
855
  ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
856
}
857

    
858
static void
859
free_simple_buf ( oi_buf *buf )
860
{
861
  free(buf->base);
862
  free(buf);
863
}
864

    
865
/* Writes a string to the socket. 
866
 * NOTE: Allocates memory. Avoid for performance applications.
867
 */ 
868
void
869
oi_socket_write_simple(oi_socket *socket, const char *str, size_t len)
870
{
871
  oi_buf *buf = malloc(sizeof(oi_buf));
872
  buf->release = free_simple_buf;
873
  buf->base = strdup(str);
874
  buf->len = len;
875

    
876
  oi_socket_write(socket, buf);
877
}
878

    
879
void
880
oi_socket_attach(EV_P_ oi_socket *socket)
881
{
882
#if EV_MULTIPLICITY
883
  socket->loop = loop;
884
#endif 
885
  socket->attached = TRUE;
886

    
887
  ev_timer_again(EV_A_ &socket->timeout_watcher);
888

    
889
  if(socket->read_action) 
890
    ev_io_start(EV_A_ &socket->read_watcher);
891

    
892
  if(socket->write_action) 
893
    ev_io_start(EV_A_ &socket->write_watcher);
894

    
895
  /* make sure the io_event happens soon  in the case we're being reattached */
896
  ev_feed_event(EV_A_ &socket->read_watcher, EV_READ);
897
}
898

    
899
void
900
oi_socket_detach(oi_socket *socket)
901
{
902
  if(socket->attached) {
903
    ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
904
    ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
905
    ev_timer_stop(SOCKET_LOOP_ &socket->timeout_watcher);
906
#if EV_MULTIPLICITY
907
    socket->loop = NULL;
908
#endif
909
    socket->attached = FALSE;
910
  }
911
}
912

    
913
void
914
oi_socket_read_stop (oi_socket *socket)
915
{
916
  ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
917
  ev_clear_pending (SOCKET_LOOP_ &socket->read_watcher);
918
}
919

    
920
void
921
oi_socket_read_start (oi_socket *socket)
922
{
923
  if(socket->read_action) {
924
    ev_io_start(SOCKET_LOOP_ &socket->read_watcher);
925
    /* XXX feed event? */
926
  }
927
}
928

    
929
int
930
oi_socket_connect(oi_socket *s, struct addrinfo *addrinfo)
931
{
932
  int fd = socket( addrinfo->ai_family
933
                 , addrinfo->ai_socktype
934
                 , addrinfo->ai_protocol
935
                 );
936
  if(fd < 0) {
937
    perror("socket()");
938
    return -1;
939
  }
940

    
941
  int flags = fcntl(fd, F_GETFL, 0);
942
  int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
943
  if(r < 0) {
944
    perror("fcntl()");
945
    return -1;
946
  }
947
      
948
#ifdef SO_NOSIGPIPE
949
  flags = 1;
950
  setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
951
#endif
952

    
953
  r = connect( fd
954
             , addrinfo->ai_addr
955
             , addrinfo->ai_addrlen
956
             );
957

    
958
  if(r < 0 && errno != EINPROGRESS) {
959
    perror("connect");
960
    close(fd);
961
    return -1;
962
  }
963

    
964
  assign_file_descriptor(s, fd);
965

    
966
  return 0;
967
}
968