The data contained in this repository can be downloaded to your computer using one of several clients.
Please see the documentation of your version control software client for more information.
Please select the desired protocol below to get the URL.
This URL has Read-Only access.
main_repo / deps / liboi / oi_async.c @ 40c0f755
History | View | Annotate | Download (10.7 KB)
1 |
#include <stdlib.h> /* malloc() */ |
---|---|
2 |
#include <stdio.h> /* perror() */ |
3 |
#include <sys/types.h> |
4 |
#include <sys/stat.h> |
5 |
#include <unistd.h> /* read(), write() */ |
6 |
#include <fcntl.h> |
7 |
#include <errno.h> |
8 |
#include <assert.h> |
9 |
#include <pthread.h> |
10 |
|
11 |
#if HAVE_SENDFILE
|
12 |
# if __linux
|
13 |
# include <sys/sendfile.h> |
14 |
# elif __freebsd
|
15 |
# include <sys/socket.h> |
16 |
# include <sys/uio.h> |
17 |
# elif __hpux
|
18 |
# include <sys/socket.h> |
19 |
# elif __solaris /* not yet */ |
20 |
# include <sys/sendfile.h> |
21 |
# else
|
22 |
# error sendfile support requested but not available
|
23 |
# endif
|
24 |
#endif
|
25 |
|
26 |
#include <ev.h> |
27 |
#include <oi.h> |
28 |
|
29 |
#define NWORKERS 4 |
30 |
/* TODO make adjustable
|
31 |
* once it is fix sleeping_tasks
|
32 |
*/
|
33 |
|
34 |
static int active_watchers = 0; |
35 |
static int active_workers = 0; |
36 |
static int readiness_pipe[2] = {-1, -1}; |
37 |
static oi_queue waiting_tasks;
|
38 |
static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
|
39 |
static pthread_mutex_t finished_lock = PTHREAD_MUTEX_INITIALIZER;
|
40 |
|
41 |
struct worker {
|
42 |
oi_task *task; |
43 |
pthread_t thread; |
44 |
pthread_attr_t thread_attr; |
45 |
}; |
46 |
|
47 |
/* Sendfile and pread emulation come from Marc Lehmann's libeio and are
|
48 |
* Copyright (C)2007,2008 Marc Alexander Lehmann.
|
49 |
* Many ideas of oi_async.* are taken from libeio and in fact, I plan to
|
50 |
* use libeio once it becomes usable for me. (The problem is issuing tasks
|
51 |
* from multiple threads.)
|
52 |
*/
|
53 |
|
54 |
#if !HAVE_PREADWRITE
|
55 |
/*
|
56 |
* make our pread/pwrite emulation safe against themselves, but not against
|
57 |
* normal read/write by using a mutex. slows down execution a lot,
|
58 |
* but that's your problem, not mine.
|
59 |
*/
|
60 |
static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER;
|
61 |
#endif
|
62 |
|
63 |
#if !HAVE_PREADWRITE
|
64 |
# undef pread
|
65 |
# undef pwrite
|
66 |
# define pread eio__pread
|
67 |
# define pwrite eio__pwrite
|
68 |
|
69 |
static ssize_t
|
70 |
eio__pread (int fd, void *buf, size_t count, off_t offset) |
71 |
{ |
72 |
ssize_t res; |
73 |
off_t ooffset; |
74 |
|
75 |
pthread_mutex_lock(&preadwritelock); |
76 |
ooffset = lseek (fd, 0, SEEK_CUR);
|
77 |
lseek (fd, offset, SEEK_SET); |
78 |
res = read (fd, buf, count); |
79 |
lseek (fd, ooffset, SEEK_SET); |
80 |
pthread_mutex_unlock(&preadwritelock); |
81 |
|
82 |
return res;
|
83 |
} |
84 |
|
85 |
static ssize_t
|
86 |
eio__pwrite (int fd, void *buf, size_t count, off_t offset) |
87 |
{ |
88 |
ssize_t res; |
89 |
off_t ooffset; |
90 |
|
91 |
pthread_mutex_lock(&preadwritelock); |
92 |
ooffset = lseek (fd, 0, SEEK_CUR);
|
93 |
lseek (fd, offset, SEEK_SET); |
94 |
res = write (fd, buf, count); |
95 |
lseek (fd, offset, SEEK_SET); |
96 |
pthread_mutex_unlock(&preadwritelock); |
97 |
|
98 |
return res;
|
99 |
} |
100 |
#endif
|
101 |
|
102 |
|
103 |
/* sendfile always needs emulation */
|
104 |
static ssize_t
|
105 |
eio__sendfile (int ofd, int ifd, off_t offset, size_t count) |
106 |
{ |
107 |
ssize_t res; |
108 |
|
109 |
if (!count)
|
110 |
return 0; |
111 |
|
112 |
#if HAVE_SENDFILE
|
113 |
# if __linux
|
114 |
res = sendfile (ofd, ifd, &offset, count); |
115 |
|
116 |
# elif __freebsd
|
117 |
/*
|
118 |
* Of course, the freebsd sendfile is a dire hack with no thoughts
|
119 |
* wasted on making it similar to other I/O functions.
|
120 |
*/
|
121 |
{ |
122 |
off_t sbytes; |
123 |
res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0); |
124 |
|
125 |
if (res < 0 && sbytes) |
126 |
/* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
|
127 |
res = sbytes; |
128 |
} |
129 |
|
130 |
# elif __hpux
|
131 |
res = sendfile (ofd, ifd, offset, count, 0, 0); |
132 |
|
133 |
# elif __solaris
|
134 |
{ |
135 |
struct sendfilevec vec;
|
136 |
size_t sbytes; |
137 |
|
138 |
vec.sfv_fd = ifd; |
139 |
vec.sfv_flag = 0;
|
140 |
vec.sfv_off = offset; |
141 |
vec.sfv_len = count; |
142 |
|
143 |
res = sendfilev (ofd, &vec, 1, &sbytes);
|
144 |
|
145 |
if (res < 0 && sbytes) |
146 |
res = sbytes; |
147 |
} |
148 |
|
149 |
# endif
|
150 |
#else
|
151 |
res = -1;
|
152 |
errno = ENOSYS; |
153 |
#endif
|
154 |
|
155 |
if (res < 0 |
156 |
&& (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK |
157 |
#if __solaris
|
158 |
|| errno == EAFNOSUPPORT || errno == EPROTOTYPE |
159 |
#endif
|
160 |
) |
161 |
) |
162 |
{ |
163 |
/* emulate sendfile. this is a major pain in the ass */
|
164 |
/* buffer size for various temporary buffers */
|
165 |
#define EIO_BUFSIZE 65536 |
166 |
char *eio_buf = malloc (EIO_BUFSIZE);
|
167 |
errno = ENOMEM; |
168 |
if (!eio_buf)
|
169 |
return -1; |
170 |
|
171 |
res = 0;
|
172 |
|
173 |
while (count) {
|
174 |
ssize_t cnt; |
175 |
|
176 |
cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset); |
177 |
|
178 |
if (cnt <= 0) { |
179 |
if (cnt && !res) res = -1; |
180 |
break;
|
181 |
} |
182 |
|
183 |
cnt = write (ofd, eio_buf, cnt); |
184 |
|
185 |
if (cnt <= 0) { |
186 |
if (cnt && !res) res = -1; |
187 |
break;
|
188 |
} |
189 |
|
190 |
offset += cnt; |
191 |
res += cnt; |
192 |
count -= cnt; |
193 |
} |
194 |
|
195 |
free(eio_buf); |
196 |
} |
197 |
|
198 |
return res;
|
199 |
} |
200 |
|
201 |
static oi_task*
|
202 |
queue_shift(pthread_mutex_t *lock, oi_queue *queue) |
203 |
{ |
204 |
oi_queue *last = NULL;
|
205 |
pthread_mutex_lock(lock); |
206 |
if(!oi_queue_empty(queue)) {
|
207 |
last = oi_queue_last(queue); |
208 |
oi_queue_remove(last); |
209 |
} |
210 |
pthread_mutex_unlock(lock); |
211 |
|
212 |
if(last == NULL) |
213 |
return NULL; |
214 |
|
215 |
return oi_queue_data(last, oi_task, queue);
|
216 |
} |
217 |
|
218 |
#define P1(name,a) { \
|
219 |
t->params.name.result = name( t->params.name.a ); \ |
220 |
break; \
|
221 |
} |
222 |
|
223 |
#define P2(name,a,b) { \
|
224 |
t->params.name.result = name( t->params.name.a \ |
225 |
, t->params.name.b \ |
226 |
); \ |
227 |
break; \
|
228 |
} |
229 |
|
230 |
#define P3(name,a,b,c) { \
|
231 |
t->params.name.result = name( t->params.name.a \ |
232 |
, t->params.name.b \ |
233 |
, t->params.name.c \ |
234 |
); \ |
235 |
break; \
|
236 |
} |
237 |
|
238 |
#define P4(name,a,b,c,d) { \
|
239 |
t->params.name.result = name( t->params.name.a \ |
240 |
, t->params.name.b \ |
241 |
, t->params.name.c \ |
242 |
, t->params.name.d \ |
243 |
); \ |
244 |
break; \
|
245 |
} |
246 |
|
247 |
static void |
248 |
execute_task(oi_task *t) |
249 |
{ |
250 |
errno = 0;
|
251 |
switch(t->type) {
|
252 |
case OI_TASK_OPEN: P3(open, pathname, flags, mode);
|
253 |
case OI_TASK_READ: P3(read, fd, buf, count);
|
254 |
case OI_TASK_WRITE: P3(write, fd, buf, count);
|
255 |
case OI_TASK_CLOSE: P1(close, fd);
|
256 |
case OI_TASK_SLEEP: P1(sleep, seconds);
|
257 |
case OI_TASK_SENDFILE: P4(eio__sendfile, out_fd, in_fd, offset, count);
|
258 |
case OI_TASK_GETADDRINFO: P4(getaddrinfo, nodename, servname, hints, res);
|
259 |
case OI_TASK_LSTAT: P2(lstat, path, buf);
|
260 |
default:
|
261 |
assert(0 && "unknown task type"); |
262 |
break;
|
263 |
} |
264 |
t->errorno = errno; |
265 |
} |
266 |
|
267 |
static void |
268 |
attempt_to_get_a_task(struct worker *worker)
|
269 |
{ |
270 |
char dummy;
|
271 |
assert(readiness_pipe[0] > 0); |
272 |
int r = read(readiness_pipe[0], &dummy, 1); |
273 |
if(r == -1 && (errno != EAGAIN || errno != EINTR)) { |
274 |
perror("read(readiness_pipe[0])");
|
275 |
return;
|
276 |
} |
277 |
|
278 |
// 1 pop task from queue
|
279 |
assert(worker->task == NULL);
|
280 |
oi_task *task = queue_shift(&queue_lock, &waiting_tasks); |
281 |
if(task == NULL) return; |
282 |
worker->task = task; |
283 |
|
284 |
// 2 run task
|
285 |
execute_task(task); |
286 |
|
287 |
// 3 notify complition
|
288 |
oi_async *async = task->async; |
289 |
assert(async != NULL);
|
290 |
pthread_mutex_lock(&finished_lock); |
291 |
oi_queue_insert_head(&async->finished_tasks, &task->queue); |
292 |
pthread_mutex_unlock(&finished_lock); |
293 |
ev_async_send(async->loop, &async->watcher); |
294 |
worker->task = NULL;
|
295 |
|
296 |
/* attempt to pull another task */
|
297 |
return attempt_to_get_a_task(worker);
|
298 |
} |
299 |
|
300 |
void *
|
301 |
worker_loop(void *data)
|
302 |
{ |
303 |
int r;
|
304 |
struct worker *worker = data;
|
305 |
fd_set readfds; |
306 |
FD_ZERO(&readfds); |
307 |
FD_SET(readiness_pipe[0], &readfds);
|
308 |
|
309 |
active_workers++; |
310 |
assert(active_workers <= NWORKERS); |
311 |
|
312 |
while(1) { |
313 |
r = select(1+readiness_pipe[0], &readfds, 0, 0, 0); |
314 |
if(r == -1) break; |
315 |
attempt_to_get_a_task(worker); |
316 |
} |
317 |
active_workers--; |
318 |
|
319 |
return NULL; |
320 |
} |
321 |
|
322 |
static struct worker* |
323 |
worker_new() |
324 |
{ |
325 |
int r;
|
326 |
struct worker *worker = calloc(sizeof(struct worker), 1); |
327 |
if(worker == NULL ) { return NULL; } |
328 |
|
329 |
worker->task = NULL;
|
330 |
pthread_attr_setdetachstate(&worker->thread_attr, PTHREAD_CREATE_DETACHED); |
331 |
|
332 |
r = pthread_create( &worker->thread |
333 |
, NULL // &worker->thread_attr |
334 |
, worker_loop |
335 |
, worker |
336 |
); |
337 |
if(r != 0) { |
338 |
/* TODO: error checking */
|
339 |
perror("pthread_create");
|
340 |
goto error;
|
341 |
} |
342 |
|
343 |
return worker;
|
344 |
error:
|
345 |
free(worker); |
346 |
return NULL; |
347 |
} |
348 |
|
349 |
static void |
350 |
start_workers() |
351 |
{ |
352 |
assert(readiness_pipe[0] == -1); |
353 |
assert(readiness_pipe[1] == -1); |
354 |
assert(active_workers == 0);
|
355 |
|
356 |
int r = pipe(readiness_pipe);
|
357 |
if(r < 0) { |
358 |
perror("pipe()");
|
359 |
assert(0 && "TODO HANDLE ME"); |
360 |
} |
361 |
|
362 |
/* set the write end non-blocking */
|
363 |
int flags = fcntl(readiness_pipe[1], F_GETFL, 0); |
364 |
r = fcntl(readiness_pipe[1], F_SETFL, flags | O_NONBLOCK);
|
365 |
if(r < 0) { |
366 |
assert(0 && "error setting pipe to non-blocking?"); |
367 |
/* TODO error report */
|
368 |
} |
369 |
|
370 |
oi_queue_init(&waiting_tasks); |
371 |
|
372 |
int i;
|
373 |
for(i = 0; i < NWORKERS; i++) |
374 |
worker_new(); |
375 |
} |
376 |
|
377 |
/*
|
378 |
static void
|
379 |
stop_workers()
|
380 |
{
|
381 |
assert(0 && "TODO implement me");
|
382 |
}
|
383 |
*/
|
384 |
|
385 |
static void |
386 |
on_completion(struct ev_loop *loop, ev_async *watcher, int revents) |
387 |
{ |
388 |
oi_async *async = watcher->data; |
389 |
oi_task *task; |
390 |
|
391 |
while((task = queue_shift(&finished_lock, &async->finished_tasks))) {
|
392 |
assert(task->active); |
393 |
task->active = 0;
|
394 |
errno = task->errorno; |
395 |
# define done_cb(kind) { \
|
396 |
assert(task->params.kind.cb); \ |
397 |
task->params.kind.cb(task, task->params.kind.result); \ |
398 |
break; \
|
399 |
} |
400 |
switch(task->type) {
|
401 |
case OI_TASK_OPEN: done_cb(open);
|
402 |
case OI_TASK_READ: done_cb(read);
|
403 |
case OI_TASK_WRITE: done_cb(write);
|
404 |
case OI_TASK_CLOSE: done_cb(close);
|
405 |
case OI_TASK_SLEEP: done_cb(sleep);
|
406 |
case OI_TASK_SENDFILE: done_cb(eio__sendfile);
|
407 |
case OI_TASK_GETADDRINFO: done_cb(getaddrinfo);
|
408 |
case OI_TASK_LSTAT: done_cb(lstat);
|
409 |
} |
410 |
/* the task is possibly freed by callback. do not access it again. */
|
411 |
} |
412 |
} |
413 |
|
414 |
void
|
415 |
oi_async_init (oi_async *async) |
416 |
{ |
417 |
ev_async_init(&async->watcher, on_completion); |
418 |
|
419 |
oi_queue_init(&async->finished_tasks); |
420 |
oi_queue_init(&async->new_tasks); |
421 |
|
422 |
async->watcher.data = async; |
423 |
} |
424 |
|
425 |
static void |
426 |
dispatch_tasks(oi_async *async) |
427 |
{ |
428 |
while(!oi_queue_empty(&async->new_tasks)) {
|
429 |
oi_queue *last = oi_queue_last(&async->new_tasks); |
430 |
oi_queue_remove(last); |
431 |
oi_task *task = oi_queue_data(last, oi_task, queue); |
432 |
|
433 |
// 1. add task to task queue.
|
434 |
pthread_mutex_lock(&queue_lock); |
435 |
oi_queue_insert_head(&waiting_tasks, &task->queue); |
436 |
pthread_mutex_unlock(&queue_lock); |
437 |
|
438 |
// 2. write byte to pipe
|
439 |
char dummy;
|
440 |
int written = write(readiness_pipe[1], &dummy, 1); |
441 |
|
442 |
// 3. TODO make sure byte is written
|
443 |
assert(written == 1);
|
444 |
} |
445 |
} |
446 |
|
447 |
void
|
448 |
oi_async_attach (struct ev_loop *loop, oi_async *async)
|
449 |
{ |
450 |
if(active_watchers == 0 && active_workers == 0) |
451 |
start_workers(); |
452 |
active_watchers++; |
453 |
|
454 |
ev_async_start(loop, &async->watcher); |
455 |
async->loop = loop; |
456 |
|
457 |
dispatch_tasks(async); |
458 |
} |
459 |
|
460 |
void
|
461 |
oi_async_detach (oi_async *async) |
462 |
{ |
463 |
if(async->loop == NULL) |
464 |
return;
|
465 |
ev_async_stop(async->loop, &async->watcher); |
466 |
async->loop = NULL;
|
467 |
active_watchers--; |
468 |
if(active_watchers == 0) { |
469 |
//stop_workers();
|
470 |
} |
471 |
} |
472 |
|
473 |
void
|
474 |
oi_async_submit (oi_async *async, oi_task *task) |
475 |
{ |
476 |
assert(!task->active); |
477 |
assert(task->async == NULL);
|
478 |
task->async = async; |
479 |
task->active = 1;
|
480 |
|
481 |
oi_queue_insert_head(&async->new_tasks, &task->queue); |
482 |
if(ev_is_active(&async->watcher)) {
|
483 |
dispatch_tasks(async); |
484 |
} |
485 |
} |
486 |
|