The data contained in this repository can be downloaded to your computer using one of several clients.
Please see the documentation of your version control software client for more information.

Please select the desired protocol below to get the URL.

This URL has Read-Only access.

Statistics
| Branch: | Revision:

main_repo / deps / liboi / oi_file.c @ 40c0f755

History | View | Annotate | Download (7.65 KB)

1
#include <stdio.h>
2
#include <stdlib.h>
3
#include <string.h>
4
#include <unistd.h>
5
#include <assert.h>
6
#include <errno.h>
7

    
8
#include <ev.h>
9
#include <oi.h>
10

    
11
#define RELEASE_BUF(buf) if(buf->release) { buf->release(buf); }
12
#define DRAIN_CB(file)   if(file->on_drain) { file->on_drain(file); }
13
#define RAISE_ERROR(s, _domain, _code) do { \
14
  if(s->on_error) { \
15
    struct oi_error __oi_error; \
16
    __oi_error.domain = _domain; \
17
    __oi_error.code = _code; \
18
    s->on_error(s, __oi_error); \
19
  } \
20
} while(0) \
21

    
22
/* forwards */
23
static void dispatch_write_buf (oi_file *file);
24
static void maybe_do_read (oi_file *file);
25

    
26
static void
27
after_read(oi_task *task, ssize_t recved)
28
{
29
  oi_file *file = task->data;
30

    
31
  if(recved == -1) {
32
    RAISE_ERROR(file, OI_ERROR_READ, errno);
33
    return;
34
  }
35

    
36
  if(recved == 0)
37
    oi_file_read_stop(file);
38

    
39
  if(file->on_read)
40
    file->on_read(file, recved);
41

    
42
  maybe_do_read(file);
43
}
44

    
45
static void
46
maybe_do_read(oi_file *file)
47
{
48
  if ( file->read_buffer == NULL
49
    || file->write_buf != NULL
50
    || file->write_socket != NULL
51
    || !oi_queue_empty(&file->write_queue)
52
    || file->io_task.active
53
     ) return;
54

    
55
  assert(file->fd > 0);
56

    
57
  oi_task_init_read ( &file->io_task
58
                    , after_read
59
                    , file->fd
60
                    , file->read_buffer
61
                    , file->read_buffer_size
62
                    );
63
  file->io_task.data = file;
64
  oi_async_submit(&file->async, &file->io_task);
65
}
66

    
67
static void 
68
submit_read (oi_file *file)
69
{
70
}
71

    
72
int 
73
oi_file_init (oi_file *file)
74
{
75
  oi_async_init(&file->async);
76
  file->async.data = file;
77

    
78
  oi_queue_init(&file->write_queue);
79

    
80
  file->fd = -1;
81
  file->loop = NULL;
82
  file->write_buf = NULL;
83
  file->read_buffer = NULL;
84

    
85
  file->on_open = NULL;
86
  file->on_read = NULL;
87
  file->on_drain = NULL;
88
  file->on_error = NULL;
89
  file->on_close = NULL;
90
  return 0;
91
}
92

    
93
void
94
oi_file_read_start (oi_file *file, void *buffer, size_t bufsize)
95
{
96
  file->read_buffer = buffer;
97
  file->read_buffer_size = bufsize;
98
  maybe_do_read(file);
99
}
100

    
101
void
102
oi_file_read_stop (oi_file *file)
103
{
104
  file->read_buffer = NULL;
105
}
106

    
107
void
108
oi_api_free_buf_with_heap_base(oi_buf *buf)
109
{
110
  free(buf->base);
111
  free(buf);
112
}
113

    
114
static void 
115
after_open(oi_task *task, int result)
116
{
117
  oi_file *file = task->data;
118

    
119
  if(result == -1) {
120
    RAISE_ERROR(file, OI_ERROR_OPEN, errno);
121
    return;
122
  }
123

    
124
  file->fd = result;
125

    
126
  if(file->on_open) {
127
    file->on_open(file);
128
  }
129

    
130
  maybe_do_read(file);
131
}
132

    
133
int
134
oi_file_open_path (oi_file *file, const char *path, int flags, mode_t mode)
135
{
136
  if(file->fd >= 0)
137
    return -1;
138
  oi_task_init_open( &file->io_task
139
                   , after_open
140
                   , path
141
                   , flags
142
                   , mode 
143
                   );
144
  file->io_task.data = file;
145
  oi_async_submit(&file->async, &file->io_task);
146
  return 0;
147
}
148

    
149
int
150
oi_file_open_stdin (oi_file *file)
151
{
152
  if(file->fd >= 0)
153
    return -1;
154
  file->fd = STDIN_FILENO;
155
  if(file->on_open)
156
    file->on_open(file);
157
  return 0;
158
}
159

    
160
int
161
oi_file_open_stdout (oi_file *file)
162
{
163
  if(file->fd >= 0)
164
    return -1;
165
  file->fd = STDOUT_FILENO;
166
  if(file->on_open)
167
    file->on_open(file);
168
  return 0;
169
}
170

    
171
int
172
oi_file_open_stderr (oi_file *file)
173
{
174
  if(file->fd >= 0)
175
    return -1;
176
  file->fd = STDERR_FILENO;
177
  if(file->on_open)
178
    file->on_open(file);
179
  return 0;
180
}
181

    
182
void
183
oi_file_attach (oi_file *file, struct ev_loop *loop)
184
{
185
  oi_async_attach (loop, &file->async);
186
  file->loop = loop;
187
}
188

    
189
void
190
oi_file_detach (oi_file *file)
191
{
192
  oi_async_detach (&file->async);
193
  file->loop = NULL;
194
}
195

    
196
static void 
197
after_write(oi_task *task, ssize_t result)
198
{
199
  oi_file *file = task->data;
200

    
201
  if(result == -1) {
202
    RAISE_ERROR(file, OI_ERROR_WRITE, errno);
203
    return;
204
  }
205

    
206
  assert(file->write_buf != NULL);
207
  oi_buf *buf = file->write_buf;
208

    
209
  buf->written += result;
210
  if(buf->written < buf->len) {
211
    oi_task_init_write ( &file->io_task
212
                       , after_write
213
                       , file->fd
214
                       , buf->base + buf->written
215
                       , buf->len - buf->written
216
                       );
217
    file->io_task.data = file;
218
    oi_async_submit(&file->async, &file->io_task);
219
    return;
220
  }
221

    
222
  assert(buf->written == buf->len);
223

    
224
  RELEASE_BUF(file->write_buf);
225
  file->write_buf = NULL;
226

    
227
  if(oi_queue_empty(&file->write_queue)) {
228
    DRAIN_CB(file);
229
    maybe_do_read(file);
230
  } else {
231
    dispatch_write_buf(file);
232
  }
233

    
234
  return;
235
}
236

    
237
static void
238
dispatch_write_buf(oi_file *file)
239
{
240
  if(file->write_buf != NULL)
241
    return;
242
  if(oi_queue_empty(&file->write_queue)) 
243
    return;
244

    
245
  oi_queue *q = oi_queue_last(&file->write_queue);
246
  oi_queue_remove(q);
247
  oi_buf *buf = file->write_buf = oi_queue_data(q, oi_buf, queue);
248

    
249
  assert(!file->io_task.active);
250
  oi_task_init_write ( &file->io_task
251
                     , after_write
252
                     , file->fd
253
                     , buf->base + buf->written
254
                     , buf->len - buf->written
255
                     );
256
  file->io_task.data = file;
257
  oi_async_submit(&file->async, &file->io_task);
258
}
259

    
260
int
261
oi_file_write (oi_file *file, oi_buf *buf)
262
{
263
  if(file->fd < 0)
264
    return -1;
265
  if(file->read_buffer)
266
    return -2;
267
  /* TODO better business check*/
268

    
269
  buf->written = 0;
270
  oi_queue_insert_head(&file->write_queue, &buf->queue);
271
  dispatch_write_buf(file);
272

    
273
  return 0;
274
}
275

    
276
// Writes a string to the file. 
277
// NOTE: Allocates memory. Avoid for performance applications.
278
int
279
oi_file_write_simple (oi_file *file, const char *str, size_t len)
280
{
281
  if(file->fd < 0)
282
    return -1;
283
  if(file->read_buffer)
284
    return -2;
285
  /* TODO better business check*/
286

    
287
  oi_buf *buf = malloc(sizeof(oi_buf));
288
  buf->base = malloc(len);
289
  memcpy(buf->base, str, len);
290
  buf->len = len;
291
  buf->release = oi_api_free_buf_with_heap_base;
292

    
293
  oi_file_write(file, buf);
294
  return 0;
295
}
296

    
297
static void
298
clear_write_queue(oi_file *file)
299
{
300
  while(!oi_queue_empty(&file->write_queue)) {
301
    oi_queue *q = oi_queue_last(&file->write_queue);
302
    oi_queue_remove(q);
303
    oi_buf *buf = oi_queue_data(q, oi_buf, queue);
304
    RELEASE_BUF(buf);
305
  }
306
}
307

    
308
static void 
309
after_close(oi_task *task, int result)
310
{
311
  oi_file *file = task->data;
312

    
313
  assert(oi_queue_empty(&file->write_queue));
314

    
315
  if(result == -1) {
316
    RAISE_ERROR(file, OI_ERROR_CLOSE, errno);
317
    return;
318
    // TODO try to close again? 
319
  }
320

    
321
  file->fd = -1;
322
  // TODO deinit task_queue, detach thread_pool_result_watcher 
323

    
324
  if(file->on_close) {
325
    file->on_close(file);
326
  }
327

    
328
  return;
329
}
330

    
331
void
332
oi_file_close (oi_file *file)
333
{
334
  assert(file->fd >= 0 && "file not open!");
335
  clear_write_queue(file);
336
  oi_task_init_close ( &file->io_task
337
                     , after_close
338
                     , file->fd
339
                     );
340
  file->io_task.data = file;
341
  oi_async_submit(&file->async, &file->io_task);
342
}
343

    
344
static void
345
after_sendfile(oi_task *task, ssize_t sent)
346
{
347
  oi_file *file = task->data;
348
  oi_socket *socket = file->write_socket;
349
  assert(socket != NULL);
350
  file->write_socket = NULL;
351

    
352
  if(sent == -1) {
353
    RAISE_ERROR(file, OI_ERROR_SENDFILE, errno);
354
    return;
355
  }
356

    
357
  if(socket->on_drain) {
358
    socket->on_drain(socket);
359
  }
360

    
361
  maybe_do_read(file);
362
}
363

    
364
int
365
oi_file_send (oi_file *file, oi_socket *destination, off_t offset, size_t count)
366
{
367
  if(file->fd < 0)
368
    return -1;
369
  if(file->read_buffer)
370
    return -2;
371
  /* TODO better business check*/
372

    
373
  assert(file->write_socket == NULL);
374
  // (1) make sure the write queue on the socket is cleared.
375
  // 
376
  // (2)
377
  // 
378
  file->write_socket = destination;
379
  oi_task_init_sendfile ( &file->io_task
380
                        , after_sendfile
381
                        , destination->fd
382
                        , file->fd
383
                        , offset
384
                        , count
385
                        );
386
  file->io_task.data = file;
387
  oi_async_submit(&file->async, &file->io_task);
388

    
389
  return 0;
390
}
391