The data contained in this repository can be downloaded to your computer using one of several clients.
Please see the documentation of your version control software client for more information.

Please select the desired protocol below to get the URL.

This URL has Read-Only access.

Statistics
| Branch: | Revision:

main_repo / deps / libeio / eio.c @ c5183738

History | View | Annotate | Download (36.1 KB)

1
/*
2
 * libeio implementation
3
 *
4
 * Copyright (c) 2007,2008 Marc Alexander Lehmann <libeio@schmorp.de>
5
 * All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or without modifica-
8
 * tion, are permitted provided that the following conditions are met:
9
 * 
10
 *   1.  Redistributions of source code must retain the above copyright notice,
11
 *       this list of conditions and the following disclaimer.
12
 * 
13
 *   2.  Redistributions in binary form must reproduce the above copyright
14
 *       notice, this list of conditions and the following disclaimer in the
15
 *       documentation and/or other materials provided with the distribution.
16
 * 
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19
 * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO
20
 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21
 * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25
 * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26
 * OF THE POSSIBILITY OF SUCH DAMAGE.
27
 *
28
 * Alternatively, the contents of this file may be used under the terms of
29
 * the GNU General Public License ("GPL") version 2 or any later version,
30
 * in which case the provisions of the GPL are applicable instead of
31
 * the above. If you wish to allow the use of your version of this file
32
 * only under the terms of the GPL and not to allow others to use your
33
 * version of this file under the BSD license, indicate your decision
34
 * by deleting the provisions above and replace them with the notice
35
 * and other provisions required by the GPL. If you do not delete the
36
 * provisions above, a recipient may use your version of this file under
37
 * either the BSD or the GPL.
38
 */
39

    
40
#include "eio.h"
41
#include "xthread.h"
42

    
43
#include <errno.h>
44
#include <stddef.h>
45
#include <stdlib.h>
46
#include <string.h>
47
#include <errno.h>
48
#include <sys/types.h>
49
#include <sys/stat.h>
50
#include <limits.h>
51
#include <fcntl.h>
52
#include <assert.h>
53

    
54
#ifndef EIO_FINISH
55
# define EIO_FINISH(req)  ((req)->finish) && !EIO_CANCELLED (req) ? (req)->finish (req) : 0
56
#endif
57

    
58
#ifndef EIO_DESTROY
59
# define EIO_DESTROY(req) do { if ((req)->destroy) (req)->destroy (req); } while (0)
60
#endif
61

    
62
#ifndef EIO_FEED
63
# define EIO_FEED(req)    do { if ((req)->feed   ) (req)->feed    (req); } while (0)
64
#endif
65

    
66
#ifdef _WIN32
67

    
68
  /*doh*/
69

    
70
#else
71

    
72
# include "config.h"
73
# include <sys/time.h>
74
# include <sys/select.h>
75
# include <unistd.h>
76
# include <utime.h>
77
# include <signal.h>
78
# include <dirent.h>
79

    
80
# ifndef EIO_STRUCT_DIRENT
81
#  define EIO_STRUCT_DIRENT struct dirent
82
# endif
83

    
84
#endif
85

    
86
#if HAVE_SENDFILE
87
# if __linux
88
#  include <sys/sendfile.h>
89
# elif __freebsd
90
#  include <sys/socket.h>
91
#  include <sys/uio.h>
92
# elif __hpux
93
#  include <sys/socket.h>
94
# elif __solaris /* not yet */
95
#  include <sys/sendfile.h>
96
# else
97
#  error sendfile support requested but not available
98
# endif
99
#endif
100

    
101
/* number of seconds after which an idle threads exit */
102
#define IDLE_TIMEOUT 10
103

    
104
/* used for struct dirent, AIX doesn't provide it */
105
#ifndef NAME_MAX
106
# define NAME_MAX 4096
107
#endif
108

    
109
/* buffer size for various temporary buffers */
110
#define EIO_BUFSIZE 65536
111

    
112
#define dBUF                                         \
113
  char *eio_buf;                                \
114
  ETP_WORKER_LOCK (self);                        \
115
  self->dbuf = eio_buf = malloc (EIO_BUFSIZE);        \
116
  ETP_WORKER_UNLOCK (self);                        \
117
  errno = ENOMEM;                                \
118
  if (!eio_buf)                                        \
119
    return -1;
120

    
121
#define EIO_TICKS ((1000000 + 1023) >> 10)
122

    
123
/*****************************************************************************/
124

    
125
#if __GNUC__ >= 3
126
# define expect(expr,value) __builtin_expect ((expr),(value))
127
#else
128
# define expect(expr,value) (expr)
129
#endif
130

    
131
#define expect_false(expr) expect ((expr) != 0, 0)
132
#define expect_true(expr)  expect ((expr) != 0, 1)
133

    
134
/*****************************************************************************/
135

    
136
#define ETP_PRI_MIN EIO_PRI_MIN
137
#define ETP_PRI_MAX EIO_PRI_MAX
138

    
139
struct etp_worker;
140

    
141
#define ETP_REQ eio_req
142
#define ETP_DESTROY(req) eio_destroy (req)
143
static int eio_finish (eio_req *req);
144
#define ETP_FINISH(req)  eio_finish (req)
145
static void eio_execute (struct etp_worker *self, eio_req *req);
146
#define ETP_EXECUTE(wrk,req) eio_execute (wrk,req)
147

    
148
#define ETP_WORKER_CLEAR(req)        \
149
  if (wrk->dbuf)                \
150
    {                                \
151
      free (wrk->dbuf);                \
152
      wrk->dbuf = 0;                \
153
    }                                \
154
                                \
155
  if (wrk->dirp)                \
156
    {                                \
157
      closedir (wrk->dirp);        \
158
      wrk->dirp = 0;                \
159
    }
160
#define ETP_WORKER_COMMON \
161
  void *dbuf;        \
162
  DIR *dirp;
163

    
164
/*****************************************************************************/
165

    
166
#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
167

    
168
/* calculcate time difference in ~1/EIO_TICKS of a second */
169
static int tvdiff (struct timeval *tv1, struct timeval *tv2)
170
{
171
  return  (tv2->tv_sec  - tv1->tv_sec ) * EIO_TICKS
172
       + ((tv2->tv_usec - tv1->tv_usec) >> 10);
173
}
174

    
175
static unsigned int started, idle, wanted = 4;
176

    
177
static void (*want_poll_cb) (void);
178
static void (*done_poll_cb) (void);
179
 
180
static unsigned int max_poll_time;     /* reslock */
181
static unsigned int max_poll_reqs;     /* reslock */
182

    
183
static volatile unsigned int nreqs;    /* reqlock */
184
static volatile unsigned int nready;   /* reqlock */
185
static volatile unsigned int npending; /* reqlock */
186
static volatile unsigned int max_idle = 4;
187

    
188
static mutex_t wrklock = X_MUTEX_INIT;
189
static mutex_t reslock = X_MUTEX_INIT;
190
static mutex_t reqlock = X_MUTEX_INIT;
191
static cond_t  reqwait = X_COND_INIT;
192

    
193
#if !HAVE_PREADWRITE
194
/*
195
 * make our pread/pwrite emulation safe against themselves, but not against
196
 * normal read/write by using a mutex. slows down execution a lot,
197
 * but that's your problem, not mine.
198
 */
199
static mutex_t preadwritelock = X_MUTEX_INIT;
200
#endif
201

    
202
typedef struct etp_worker
203
{
204
  /* locked by wrklock */
205
  struct etp_worker *prev, *next;
206

    
207
  thread_t tid;
208

    
209
  /* locked by reslock, reqlock or wrklock */
210
  ETP_REQ *req; /* currently processed request */
211

    
212
  ETP_WORKER_COMMON
213
} etp_worker;
214

    
215
static etp_worker wrk_first = { &wrk_first, &wrk_first, 0 }; /* NOT etp */
216

    
217
#define ETP_WORKER_LOCK(wrk)   X_LOCK   (wrklock)
218
#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
219

    
220
/* worker threads management */
221

    
222
static void etp_worker_clear (etp_worker *wrk)
223
{
224
  ETP_WORKER_CLEAR (wrk);
225
}
226

    
227
static void etp_worker_free (etp_worker *wrk)
228
{
229
  wrk->next->prev = wrk->prev;
230
  wrk->prev->next = wrk->next;
231

    
232
  free (wrk);
233
}
234

    
235
static unsigned int etp_nreqs (void)
236
{
237
  int retval;
238
  if (WORDACCESS_UNSAFE) X_LOCK   (reqlock);
239
  retval = nreqs;
240
  if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
241
  return retval;
242
}
243

    
244
static unsigned int etp_nready (void)
245
{
246
  unsigned int retval;
247

    
248
  if (WORDACCESS_UNSAFE) X_LOCK   (reqlock);
249
  retval = nready;
250
  if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
251

    
252
  return retval;
253
}
254

    
255
static unsigned int etp_npending (void)
256
{
257
  unsigned int retval;
258

    
259
  if (WORDACCESS_UNSAFE) X_LOCK   (reqlock);
260
  retval = npending;
261
  if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
262

    
263
  return retval;
264
}
265

    
266
static unsigned int etp_nthreads (void)
267
{
268
  unsigned int retval;
269

    
270
  if (WORDACCESS_UNSAFE) X_LOCK   (reqlock);
271
  retval = started;
272
  if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
273

    
274
  return retval;
275
}
276

    
277
/*
278
 * a somewhat faster data structure might be nice, but
279
 * with 8 priorities this actually needs <20 insns
280
 * per shift, the most expensive operation.
281
 */
282
typedef struct {
283
  ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
284
  int size;
285
} etp_reqq;
286

    
287
static etp_reqq req_queue;
288
static etp_reqq res_queue;
289

    
290
static int reqq_push (etp_reqq *q, ETP_REQ *req)
291
{
292
  int pri = req->pri;
293
  req->next = 0;
294

    
295
  if (q->qe[pri])
296
    {
297
      q->qe[pri]->next = req;
298
      q->qe[pri] = req;
299
    }
300
  else
301
    q->qe[pri] = q->qs[pri] = req;
302

    
303
  return q->size++;
304
}
305

    
306
static ETP_REQ *reqq_shift (etp_reqq *q)
307
{
308
  int pri;
309

    
310
  if (!q->size)
311
    return 0;
312

    
313
  --q->size;
314

    
315
  for (pri = ETP_NUM_PRI; pri--; )
316
    {
317
      eio_req *req = q->qs[pri];
318

    
319
      if (req)
320
        {
321
          if (!(q->qs[pri] = (eio_req *)req->next))
322
            q->qe[pri] = 0;
323

    
324
          return req;
325
        }
326
    }
327

    
328
  abort ();
329
}
330

    
331
static void etp_atfork_prepare (void)
332
{
333
  X_LOCK (wrklock);
334
  X_LOCK (reqlock);
335
  X_LOCK (reslock);
336
#if !HAVE_PREADWRITE
337
  X_LOCK (preadwritelock);
338
#endif
339
}
340

    
341
static void etp_atfork_parent (void)
342
{
343
#if !HAVE_PREADWRITE
344
  X_UNLOCK (preadwritelock);
345
#endif
346
  X_UNLOCK (reslock);
347
  X_UNLOCK (reqlock);
348
  X_UNLOCK (wrklock);
349
}
350

    
351
static void etp_atfork_child (void)
352
{
353
  ETP_REQ *prv;
354

    
355
  while ((prv = reqq_shift (&req_queue)))
356
    ETP_DESTROY (prv);
357

    
358
  while ((prv = reqq_shift (&res_queue)))
359
    ETP_DESTROY (prv);
360

    
361
  while (wrk_first.next != &wrk_first)
362
    {
363
      etp_worker *wrk = wrk_first.next;
364

    
365
      if (wrk->req)
366
        ETP_DESTROY (wrk->req);
367

    
368
      etp_worker_clear (wrk);
369
      etp_worker_free (wrk);
370
    }
371

    
372
  started  = 0;
373
  idle     = 0;
374
  nreqs    = 0;
375
  nready   = 0;
376
  npending = 0;
377

    
378
  etp_atfork_parent ();
379
}
380

    
381
static void
382
etp_once_init (void)
383
{    
384
  X_THREAD_ATFORK (etp_atfork_prepare, etp_atfork_parent, etp_atfork_child);
385
}
386

    
387
static int
388
etp_init (void (*want_poll)(void), void (*done_poll)(void))
389
{
390
  static pthread_once_t doinit = PTHREAD_ONCE_INIT;
391

    
392
  pthread_once (&doinit, etp_once_init);
393

    
394
  want_poll_cb = want_poll;
395
  done_poll_cb = done_poll;
396

    
397
  return 0;
398
}
399

    
400
X_THREAD_PROC (etp_proc);
401

    
402
static void etp_start_thread (void)
403
{
404
  etp_worker *wrk = calloc (1, sizeof (etp_worker));
405

    
406
  /*TODO*/
407
  assert (("unable to allocate worker thread data", wrk));
408

    
409
  X_LOCK (wrklock);
410

    
411
  if (thread_create (&wrk->tid, etp_proc, (void *)wrk))
412
    {
413
      wrk->prev = &wrk_first;
414
      wrk->next = wrk_first.next;
415
      wrk_first.next->prev = wrk;
416
      wrk_first.next = wrk;
417
      ++started;
418
    }
419
  else
420
    free (wrk);
421

    
422
  X_UNLOCK (wrklock);
423
}
424

    
425
static void etp_maybe_start_thread (void)
426
{
427
  if (expect_true (etp_nthreads () >= wanted))
428
    return;
429
  
430
  /* todo: maybe use idle here, but might be less exact */
431
  if (expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
432
    return;
433

    
434
  etp_start_thread ();
435
}
436

    
437
static void etp_end_thread (void)
438
{
439
  eio_req *req = calloc (1, sizeof (eio_req));
440

    
441
  req->type = -1;
442
  req->pri  = ETP_PRI_MAX - ETP_PRI_MIN;
443

    
444
  X_LOCK (reqlock);
445
  reqq_push (&req_queue, req);
446
  X_COND_SIGNAL (reqwait);
447
  X_UNLOCK (reqlock);
448

    
449
  X_LOCK (wrklock);
450
  --started;
451
  X_UNLOCK (wrklock);
452
}
453

    
454
static int etp_poll (void)
455
{
456
  unsigned int maxreqs;
457
  unsigned int maxtime;
458
  struct timeval tv_start, tv_now;
459

    
460
  X_LOCK (reslock);
461
  maxreqs = max_poll_reqs;
462
  maxtime = max_poll_time;
463
  X_UNLOCK (reslock);
464

    
465
  if (maxtime)
466
    gettimeofday (&tv_start, 0);
467

    
468
  for (;;)
469
    {
470
      ETP_REQ *req;
471

    
472
      etp_maybe_start_thread ();
473

    
474
      X_LOCK (reslock);
475
      req = reqq_shift (&res_queue);
476

    
477
      if (req)
478
        {
479
          --npending;
480

    
481
          if (!res_queue.size && done_poll_cb)
482
            done_poll_cb ();
483
        }
484

    
485
      X_UNLOCK (reslock);
486

    
487
      if (!req)
488
        return 0;
489

    
490
      X_LOCK (reqlock);
491
      --nreqs;
492
      X_UNLOCK (reqlock);
493

    
494
      if (expect_false (req->type == EIO_GROUP && req->size))
495
        {
496
          req->int1 = 1; /* mark request as delayed */
497
          continue;
498
        }
499
      else
500
        {
501
          int res = ETP_FINISH (req);
502
          if (expect_false (res))
503
            return res;
504
        }
505

    
506
      if (expect_false (maxreqs && !--maxreqs))
507
        break;
508

    
509
      if (maxtime)
510
        {
511
          gettimeofday (&tv_now, 0);
512

    
513
          if (tvdiff (&tv_start, &tv_now) >= maxtime)
514
            break;
515
        }
516
    }
517

    
518
  errno = EAGAIN;
519
  return -1;
520
}
521

    
522
static void etp_cancel (ETP_REQ *req)
523
{
524
  X_LOCK   (wrklock);
525
  req->flags |= EIO_FLAG_CANCELLED;
526
  X_UNLOCK (wrklock);
527

    
528
  eio_grp_cancel (req);
529
}
530

    
531
static void etp_submit (ETP_REQ *req)
532
{
533
  req->pri -= ETP_PRI_MIN;
534

    
535
  if (expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
536
  if (expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
537

    
538
  if (expect_false (req->type == EIO_GROUP))
539
    {
540
      /* I hope this is worth it :/ */
541
      X_LOCK (reqlock);
542
      ++nreqs;
543
      X_UNLOCK (reqlock);
544

    
545
      X_LOCK (reslock);
546

    
547
      ++npending;
548

    
549
      if (!reqq_push (&res_queue, req) && want_poll_cb)
550
        want_poll_cb ();
551

    
552
      X_UNLOCK (reslock);
553
    }
554
  else
555
    {
556
      X_LOCK (reqlock);
557
      ++nreqs;
558
      ++nready;
559
      reqq_push (&req_queue, req);
560
      X_COND_SIGNAL (reqwait);
561
      X_UNLOCK (reqlock);
562

    
563
      etp_maybe_start_thread ();
564
    }
565
}
566

    
567
static void etp_set_max_poll_time (double nseconds)
568
{
569
  if (WORDACCESS_UNSAFE) X_LOCK   (reslock);
570
  max_poll_time = nseconds;
571
  if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
572
}
573

    
574
static void etp_set_max_poll_reqs (unsigned int maxreqs)
575
{
576
  if (WORDACCESS_UNSAFE) X_LOCK   (reslock);
577
  max_poll_reqs = maxreqs;
578
  if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
579
}
580

    
581
static void etp_set_max_idle (unsigned int nthreads)
582
{
583
  if (WORDACCESS_UNSAFE) X_LOCK   (reqlock);
584
  max_idle = nthreads <= 0 ? 1 : nthreads;
585
  if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
586
}
587

    
588
static void etp_set_min_parallel (unsigned int nthreads)
589
{
590
  if (wanted < nthreads)
591
    wanted = nthreads;
592
}
593

    
594
static void etp_set_max_parallel (unsigned int nthreads)
595
{
596
  if (wanted > nthreads)
597
    wanted = nthreads;
598

    
599
  while (started > wanted)
600
    etp_end_thread ();
601
}
602

    
603
/*****************************************************************************/
604

    
605
static void grp_try_feed (eio_req *grp)
606
{
607
  while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
608
    {
609
      grp->flags &= ~EIO_FLAG_GROUPADD;
610

    
611
      EIO_FEED (grp);
612

    
613
      /* stop if no progress has been made */
614
      if (!(grp->flags & EIO_FLAG_GROUPADD))
615
        {
616
          grp->feed = 0;
617
          break;
618
        }
619
    }
620
}
621

    
622
static int grp_dec (eio_req *grp)
623
{
624
  --grp->size;
625

    
626
  /* call feeder, if applicable */
627
  grp_try_feed (grp);
628

    
629
  /* finish, if done */
630
  if (!grp->size && grp->int1)
631
    return eio_finish (grp);
632
  else
633
    return 0;
634
}
635

    
636
void eio_destroy (eio_req *req)
637
{
638
  if ((req)->flags & EIO_FLAG_PTR1_FREE) free (req->ptr1);
639
  if ((req)->flags & EIO_FLAG_PTR2_FREE) free (req->ptr2);
640

    
641
  EIO_DESTROY (req);
642
}
643

    
644
static int eio_finish (eio_req *req)
645
{
646
  int res = EIO_FINISH (req);
647

    
648
  if (req->grp)
649
    {
650
      int res2;
651
      eio_req *grp = req->grp;
652

    
653
      /* unlink request */
654
      if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
655
      if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
656

    
657
      if (grp->grp_first == req)
658
        grp->grp_first = req->grp_next;
659

    
660
      res2 = grp_dec (grp);
661

    
662
      if (!res && res2)
663
        res = res2;
664
    }
665

    
666
  eio_destroy (req);
667

    
668
  return res;
669
}
670

    
671
void eio_grp_cancel (eio_req *grp)
672
{
673
  for (grp = grp->grp_first; grp; grp = grp->grp_next)
674
    eio_cancel (grp);
675
}
676

    
677
void eio_cancel (eio_req *req)
678
{
679
  etp_cancel (req);
680
}
681

    
682
void eio_submit (eio_req *req)
683
{
684
  etp_submit (req);
685
}
686

    
687
unsigned int eio_nreqs (void)
688
{
689
  return etp_nreqs ();
690
}
691

    
692
unsigned int eio_nready (void)
693
{
694
  return etp_nready ();
695
}
696

    
697
unsigned int eio_npending (void)
698
{
699
  return etp_npending ();
700
}
701

    
702
unsigned int eio_nthreads (void)
703
{
704
  return etp_nthreads ();
705
}
706

    
707
void eio_set_max_poll_time (double nseconds)
708
{
709
  etp_set_max_poll_time (nseconds);
710
}
711

    
712
void eio_set_max_poll_reqs (unsigned int maxreqs)
713
{
714
  etp_set_max_poll_reqs (maxreqs);
715
}
716

    
717
void eio_set_max_idle (unsigned int nthreads)
718
{
719
  etp_set_max_idle (nthreads);
720
}
721

    
722
void eio_set_min_parallel (unsigned int nthreads)
723
{
724
  etp_set_min_parallel (nthreads);
725
}
726

    
727
void eio_set_max_parallel (unsigned int nthreads)
728
{
729
  etp_set_max_parallel (nthreads);
730
}
731

    
732
int eio_poll (void)
733
{
734
  return etp_poll ();
735
}
736

    
737
/*****************************************************************************/
738
/* work around various missing functions */
739

    
740
#if !HAVE_PREADWRITE
741
# undef pread
742
# undef pwrite
743
# define pread  eio__pread
744
# define pwrite eio__pwrite
745

    
746
static ssize_t
747
eio__pread (int fd, void *buf, size_t count, off_t offset)
748
{
749
  ssize_t res;
750
  off_t ooffset;
751

    
752
  X_LOCK (preadwritelock);
753
  ooffset = lseek (fd, 0, SEEK_CUR);
754
  lseek (fd, offset, SEEK_SET);
755
  res = read (fd, buf, count);
756
  lseek (fd, ooffset, SEEK_SET);
757
  X_UNLOCK (preadwritelock);
758

    
759
  return res;
760
}
761

    
762
static ssize_t
763
eio__pwrite (int fd, void *buf, size_t count, off_t offset)
764
{
765
  ssize_t res;
766
  off_t ooffset;
767

    
768
  X_LOCK (preadwritelock);
769
  ooffset = lseek (fd, 0, SEEK_CUR);
770
  lseek (fd, offset, SEEK_SET);
771
  res = write (fd, buf, count);
772
  lseek (fd, offset, SEEK_SET);
773
  X_UNLOCK (preadwritelock);
774

    
775
  return res;
776
}
777
#endif
778

    
779
#ifndef HAVE_FUTIMES
780

    
781
# undef utimes
782
# undef futimes
783
# define utimes(path,times)  eio__utimes  (path, times)
784
# define futimes(fd,times)   eio__futimes (fd, times)
785

    
786
static int
787
eio__utimes (const char *filename, const struct timeval times[2])
788
{
789
  if (times)
790
    {
791
      struct utimbuf buf;
792

    
793
      buf.actime  = times[0].tv_sec;
794
      buf.modtime = times[1].tv_sec;
795

    
796
      return utime (filename, &buf);
797
    }
798
  else
799
    return utime (filename, 0);
800
}
801

    
802
static int eio__futimes (int fd, const struct timeval tv[2])
803
{
804
  errno = ENOSYS;
805
  return -1;
806
}
807

    
808
#endif
809

    
810
#if !HAVE_FDATASYNC
811
# undef fdatasync
812
# define fdatasync(fd) fsync (fd)
813
#endif
814

    
815
/* sync_file_range always needs emulation */
816
int
817
eio__sync_file_range (int fd, off_t offset, size_t nbytes, unsigned int flags)
818
{
819
#if HAVE_SYNC_FILE_RANGE
820
  int res;
821

    
822
  if (EIO_SYNC_FILE_RANGE_WAIT_BEFORE   != SYNC_FILE_RANGE_WAIT_BEFORE
823
      || EIO_SYNC_FILE_RANGE_WRITE      != SYNC_FILE_RANGE_WRITE
824
      || EIO_SYNC_FILE_RANGE_WAIT_AFTER != SYNC_FILE_RANGE_WAIT_AFTER)
825
    {
826
      flags = 0
827
         | (flags & EIO_SYNC_FILE_RANGE_WAIT_BEFORE ? SYNC_FILE_RANGE_WAIT_BEFORE : 0)
828
         | (flags & EIO_SYNC_FILE_RANGE_WRITE       ? SYNC_FILE_RANGE_WRITE       : 0)
829
         | (flags & EIO_SYNC_FILE_RANGE_WAIT_AFTER  ? SYNC_FILE_RANGE_WAIT_AFTER  : 0);
830
    }
831

    
832
  res = sync_file_range (fd, offset, nbytes, flags);
833

    
834
  if (!res || errno != ENOSYS)
835
    return res;
836
#endif
837

    
838
  /* even though we could play tricks with the flags, it's better to always
839
   * call fdatasync, as thta matches the expectation of it's users best */
840
  return fdatasync (fd);
841
}
842

    
843
#if !HAVE_READAHEAD
844
# undef readahead
845
# define readahead(fd,offset,count) eio__readahead (fd, offset, count, self)
846

    
847
static ssize_t
848
eio__readahead (int fd, off_t offset, size_t count, etp_worker *self)
849
{
850
  size_t todo = count;
851
  dBUF;
852

    
853
  while (todo > 0)
854
    {
855
      size_t len = todo < EIO_BUFSIZE ? todo : EIO_BUFSIZE;
856

    
857
      pread (fd, eio_buf, len, offset);
858
      offset += len;
859
      todo   -= len;
860
    }
861

    
862
  errno = 0;
863
  return count;
864
}
865

    
866
#endif
867

    
868
/* sendfile always needs emulation */
869
static ssize_t
870
eio__sendfile (int ofd, int ifd, off_t offset, size_t count, etp_worker *self)
871
{
872
  ssize_t res;
873

    
874
  if (!count)
875
    return 0;
876

    
877
#if HAVE_SENDFILE
878
# if __linux
879
  res = sendfile (ofd, ifd, &offset, count);
880

    
881
# elif __freebsd
882
  /*
883
   * Of course, the freebsd sendfile is a dire hack with no thoughts
884
   * wasted on making it similar to other I/O functions.
885
   */
886
  {
887
    off_t sbytes;
888
    res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
889

    
890
    if (res < 0 && sbytes)
891
      /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
892
      res = sbytes;
893
  }
894

    
895
# elif __hpux
896
  res = sendfile (ofd, ifd, offset, count, 0, 0);
897

    
898
# elif __solaris
899
  {
900
    struct sendfilevec vec;
901
    size_t sbytes;
902

    
903
    vec.sfv_fd   = ifd;
904
    vec.sfv_flag = 0;
905
    vec.sfv_off  = offset;
906
    vec.sfv_len  = count;
907

    
908
    res = sendfilev (ofd, &vec, 1, &sbytes);
909

    
910
    if (res < 0 && sbytes)
911
      res = sbytes;
912
  }
913

    
914
# endif
915
#else
916
  res = -1;
917
  errno = ENOSYS;
918
#endif
919

    
920
  if (res <  0
921
      && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
922
#if __solaris
923
          || errno == EAFNOSUPPORT || errno == EPROTOTYPE
924
#endif
925
         )
926
      )
927
    {
928
      /* emulate sendfile. this is a major pain in the ass */
929
      dBUF;
930

    
931
      res = 0;
932

    
933
      while (count)
934
        {
935
          ssize_t cnt;
936
          
937
          cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
938

    
939
          if (cnt <= 0)
940
            {
941
              if (cnt && !res) res = -1;
942
              break;
943
            }
944

    
945
          cnt = write (ofd, eio_buf, cnt);
946

    
947
          if (cnt <= 0)
948
            {
949
              if (cnt && !res) res = -1;
950
              break;
951
            }
952

    
953
          offset += cnt;
954
          res    += cnt;
955
          count  -= cnt;
956
        }
957
    }
958

    
959
  return res;
960
}
961

    
962
/* read a full directory */
963
static void
964
eio__scandir (eio_req *req, etp_worker *self)
965
{
966
  DIR *dirp;
967
  EIO_STRUCT_DIRENT *entp;
968
  char *name, *names;
969
  int memlen = 4096;
970
  int memofs = 0;
971
  int res = 0;
972

    
973
  X_LOCK (wrklock);
974
  /* the corresponding closedir is in ETP_WORKER_CLEAR */
975
  self->dirp = dirp = opendir (req->ptr1);
976
  req->flags |= EIO_FLAG_PTR2_FREE;
977
  req->ptr2 = names = malloc (memlen);
978
  X_UNLOCK (wrklock);
979

    
980
  if (dirp && names)
981
    for (;;)
982
      {
983
        errno = 0;
984
        entp = readdir (dirp);
985

    
986
        if (!entp)
987
          break;
988

    
989
        name = entp->d_name;
990

    
991
        if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
992
          {
993
            int len = strlen (name) + 1;
994

    
995
            res++;
996

    
997
            while (memofs + len > memlen)
998
              {
999
                memlen *= 2;
1000
                X_LOCK (wrklock);
1001
                req->ptr2 = names = realloc (names, memlen);
1002
                X_UNLOCK (wrklock);
1003

    
1004
                if (!names)
1005
                  break;
1006
              }
1007

    
1008
            memcpy (names + memofs, name, len);
1009
            memofs += len;
1010
          }
1011
      }
1012

    
1013
  if (errno)
1014
    res = -1;
1015
  
1016
  req->result = res;
1017
}
1018

    
1019
#if !(_POSIX_MAPPED_FILES && _POSIX_SYNCHRONIZED_IO)
1020
# undef msync
1021
# define msync(a,b,c) ((errno = ENOSYS), -1)
1022
#endif
1023

    
1024
int
1025
eio__mtouch (void *mem, size_t len, int flags)
1026
{
1027
  intptr_t addr = (intptr_t)mem;
1028
  intptr_t end = addr + len;
1029
#ifdef PAGESIZE
1030
  const intptr_t page = PAGESIZE;
1031
#else
1032
  static intptr_t page;
1033

    
1034
  if (!page)
1035
    page = sysconf (_SC_PAGESIZE);
1036
#endif
1037

    
1038
  addr &= ~(page - 1); /* assume page size is always a power of two */
1039

    
1040
  if (addr < end)
1041
    if (flags) /* modify */
1042
      do { *((volatile sig_atomic_t *)addr) |= 0; } while ((addr += page) < len);
1043
    else
1044
      do { *((volatile sig_atomic_t *)addr)     ; } while ((addr += page) < len);
1045

    
1046
  return 0;
1047
}
1048

    
1049
/*****************************************************************************/
1050

    
1051
#define ALLOC(len)                                \
1052
  if (!req->ptr2)                                \
1053
    {                                                \
1054
      X_LOCK (wrklock);                                \
1055
      req->flags |= EIO_FLAG_PTR2_FREE;                \
1056
      X_UNLOCK (wrklock);                        \
1057
      req->ptr2 = malloc (len);                        \
1058
      if (!req->ptr2)                                \
1059
        {                                        \
1060
          errno       = ENOMEM;                        \
1061
          req->result = -1;                        \
1062
          break;                                \
1063
        }                                        \
1064
    }
1065

    
1066
X_THREAD_PROC (etp_proc)
1067
{
1068
  ETP_REQ *req;
1069
  struct timespec ts;
1070
  etp_worker *self = (etp_worker *)thr_arg;
1071

    
1072
  /* try to distribute timeouts somewhat randomly */
1073
  ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
1074

    
1075
  for (;;)
1076
    {
1077
      X_LOCK (reqlock);
1078

    
1079
      for (;;)
1080
        {
1081
          self->req = req = reqq_shift (&req_queue);
1082

    
1083
          if (req)
1084
            break;
1085

    
1086
          ++idle;
1087

    
1088
          ts.tv_sec = time (0) + IDLE_TIMEOUT;
1089
          if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
1090
            {
1091
              if (idle > max_idle)
1092
                {
1093
                  --idle;
1094
                  X_UNLOCK (reqlock);
1095
                  X_LOCK (wrklock);
1096
                  --started;
1097
                  X_UNLOCK (wrklock);
1098
                  goto quit;
1099
                }
1100

    
1101
              /* we are allowed to idle, so do so without any timeout */
1102
              X_COND_WAIT (reqwait, reqlock);
1103
            }
1104

    
1105
          --idle;
1106
        }
1107

    
1108
      --nready;
1109

    
1110
      X_UNLOCK (reqlock);
1111
     
1112
      if (req->type < 0)
1113
        goto quit;
1114

    
1115
      if (!EIO_CANCELLED (req))
1116
        ETP_EXECUTE (self, req);
1117

    
1118
      X_LOCK (reslock);
1119

    
1120
      ++npending;
1121

    
1122
      if (!reqq_push (&res_queue, req) && want_poll_cb)
1123
        want_poll_cb ();
1124

    
1125
      self->req = 0;
1126
      etp_worker_clear (self);
1127

    
1128
      X_UNLOCK (reslock);
1129
    }
1130

    
1131
quit:
1132
  X_LOCK (wrklock);
1133
  etp_worker_free (self);
1134
  X_UNLOCK (wrklock);
1135

    
1136
  return 0;
1137
}
1138

    
1139
/*****************************************************************************/
1140

    
1141
int eio_init (void (*want_poll)(void), void (*done_poll)(void))
1142
{
1143
  return etp_init (want_poll, done_poll);
1144
}
1145

    
1146
static void eio_api_destroy (eio_req *req)
1147
{
1148
  free (req);
1149
}
1150

    
1151
#define REQ(rtype)                                                    \
1152
  eio_req *req;                                                 \
1153
                                                                \
1154
  req = (eio_req *)calloc (1, sizeof *req);                     \
1155
  if (!req)                                                     \
1156
    return 0;                                                   \
1157
                                                                \
1158
  req->type    = rtype;                                         \
1159
  req->pri     = pri;                                                \
1160
  req->finish  = cb;                                                \
1161
  req->data    = data;                                                \
1162
  req->destroy = eio_api_destroy;
1163

    
1164
#define SEND eio_submit (req); return req
1165

    
1166
#define PATH                                                        \
1167
  req->flags |= EIO_FLAG_PTR1_FREE;                                \
1168
  req->ptr1 = strdup (path);                                        \
1169
  if (!req->ptr1)                                                \
1170
    {                                                                \
1171
      eio_api_destroy (req);                                        \
1172
      return 0;                                                        \
1173
    }
1174

    
1175
static void eio_execute (etp_worker *self, eio_req *req)
1176
{
1177
  errno = 0;
1178

    
1179
  switch (req->type)
1180
    {
1181
      case EIO_READ:      ALLOC (req->size);
1182
                          req->result = req->offs >= 0
1183
                                      ? pread     (req->int1, req->ptr2, req->size, req->offs)
1184
                                      : read      (req->int1, req->ptr2, req->size); break;
1185
      case EIO_WRITE:     req->result = req->offs >= 0
1186
                                      ? pwrite    (req->int1, req->ptr2, req->size, req->offs)
1187
                                      : write     (req->int1, req->ptr2, req->size); break;
1188

    
1189
      case EIO_READAHEAD: req->result = readahead     (req->int1, req->offs, req->size); break;
1190
      case EIO_SENDFILE:  req->result = eio__sendfile (req->int1, req->int2, req->offs, req->size, self); break;
1191

    
1192
      case EIO_STAT:      ALLOC (sizeof (EIO_STRUCT_STAT));
1193
                          req->result = stat      (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
1194
      case EIO_LSTAT:     ALLOC (sizeof (EIO_STRUCT_STAT));
1195
                          req->result = lstat     (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
1196
      case EIO_FSTAT:     ALLOC (sizeof (EIO_STRUCT_STAT));
1197
                          req->result = fstat     (req->int1, (EIO_STRUCT_STAT *)req->ptr2); break;
1198

    
1199
      case EIO_CHOWN:     req->result = chown     (req->ptr1, req->int2, req->int3); break;
1200
      case EIO_FCHOWN:    req->result = fchown    (req->int1, req->int2, req->int3); break;
1201
      case EIO_CHMOD:     req->result = chmod     (req->ptr1, (mode_t)req->int2); break;
1202
      case EIO_FCHMOD:    req->result = fchmod    (req->int1, (mode_t)req->int2); break;
1203
      case EIO_TRUNCATE:  req->result = truncate  (req->ptr1, req->offs); break;
1204
      case EIO_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
1205

    
1206
      case EIO_OPEN:      req->result = open      (req->ptr1, req->int1, (mode_t)req->int2); break;
1207
      case EIO_CLOSE:     req->result = close     (req->int1); break;
1208
      case EIO_DUP2:      req->result = dup2      (req->int1, req->int2); break;
1209
      case EIO_UNLINK:    req->result = unlink    (req->ptr1); break;
1210
      case EIO_RMDIR:     req->result = rmdir     (req->ptr1); break;
1211
      case EIO_MKDIR:     req->result = mkdir     (req->ptr1, (mode_t)req->int2); break;
1212
      case EIO_RENAME:    req->result = rename    (req->ptr1, req->ptr2); break;
1213
      case EIO_LINK:      req->result = link      (req->ptr1, req->ptr2); break;
1214
      case EIO_SYMLINK:   req->result = symlink   (req->ptr1, req->ptr2); break;
1215
      case EIO_MKNOD:     req->result = mknod     (req->ptr1, (mode_t)req->int2, (dev_t)req->int3); break;
1216

    
1217
      case EIO_READLINK:  ALLOC (NAME_MAX);
1218
                          req->result = readlink  (req->ptr1, req->ptr2, NAME_MAX); break;
1219

    
1220
      case EIO_SYNC:      req->result = 0; sync (); break;
1221
      case EIO_FSYNC:     req->result = fsync     (req->int1); break;
1222
      case EIO_FDATASYNC: req->result = fdatasync (req->int1); break;
1223
      case EIO_MSYNC:     req->result = msync (req->ptr2, req->size, req->int1); break;
1224
      case EIO_MTOUCH:    req->result = eio__mtouch (req->ptr2, req->size, req->int1); break;
1225
      case EIO_SYNC_FILE_RANGE: req->result = eio__sync_file_range (req->int1, req->offs, req->size, req->int2); break;
1226

    
1227
      case EIO_READDIR:   eio__scandir (req, self); break;
1228

    
1229
      case EIO_BUSY:
1230
#ifdef _WIN32
1231
        Sleep (req->nv1 * 1000.);
1232
#else
1233
        {
1234
          struct timeval tv;
1235

    
1236
          tv.tv_sec  = req->nv1;
1237
          tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.;
1238

    
1239
          req->result = select (0, 0, 0, 0, &tv);
1240
        }
1241
#endif
1242
        break;
1243

    
1244
      case EIO_UTIME:
1245
      case EIO_FUTIME:
1246
        {
1247
          struct timeval tv[2];
1248
          struct timeval *times;
1249

    
1250
          if (req->nv1 != -1. || req->nv2 != -1.)
1251
            {
1252
              tv[0].tv_sec  = req->nv1;
1253
              tv[0].tv_usec = (req->nv1 - tv[0].tv_sec) * 1000000.;
1254
              tv[1].tv_sec  = req->nv2;
1255
              tv[1].tv_usec = (req->nv2 - tv[1].tv_sec) * 1000000.;
1256

    
1257
              times = tv;
1258
            }
1259
          else
1260
            times = 0;
1261

    
1262

    
1263
          req->result = req->type == EIO_FUTIME
1264
                        ? futimes (req->int1, times)
1265
                        : utimes  (req->ptr1, times);
1266
        }
1267
        break;
1268

    
1269
      case EIO_GROUP:
1270
        abort (); /* handled in eio_request */
1271

    
1272
      case EIO_NOP:
1273
        req->result = 0;
1274
        break;
1275

    
1276
      case EIO_CUSTOM:
1277
        ((void (*)(eio_req *))req->feed) (req);
1278
        break;
1279

    
1280
      default:
1281
        req->result = -1;
1282
        break;
1283
    }
1284

    
1285
  req->errorno = errno;
1286
}
1287

    
1288
#ifndef EIO_NO_WRAPPERS
1289

    
1290
eio_req *eio_nop (int pri, eio_cb cb, void *data)
1291
{
1292
  REQ (EIO_NOP); SEND;
1293
}
1294

    
1295
eio_req *eio_busy (double delay, int pri, eio_cb cb, void *data)
1296
{
1297
  REQ (EIO_BUSY); req->nv1 = delay; SEND;
1298
}
1299

    
1300
eio_req *eio_sync (int pri, eio_cb cb, void *data)
1301
{
1302
  REQ (EIO_SYNC); SEND;
1303
}
1304

    
1305
eio_req *eio_fsync (int fd, int pri, eio_cb cb, void *data)
1306
{
1307
  REQ (EIO_FSYNC); req->int1 = fd; SEND;
1308
}
1309

    
1310
eio_req *eio_msync (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data)
1311
{
1312
  REQ (EIO_MSYNC); req->ptr2 = addr; req->size = length; req->int1 = flags; SEND;
1313
}
1314

    
1315
eio_req *eio_mtouch (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data)
1316
{
1317
  REQ (EIO_MTOUCH); req->ptr2 = addr; req->size = length; req->int1 = flags; SEND;
1318
}
1319

    
1320
eio_req *eio_sync_file_range (int fd, off_t offset, size_t nbytes, unsigned int flags, int pri, eio_cb cb, void *data)
1321
{
1322
  REQ (EIO_SYNC_FILE_RANGE); req->int1 = fd; req->offs = offset; req->size = nbytes; req->int2 = flags; SEND;
1323
}
1324

    
1325
eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data)
1326
{
1327
  REQ (EIO_FDATASYNC); req->int1 = fd; SEND;
1328
}
1329

    
1330
eio_req *eio_close (int fd, int pri, eio_cb cb, void *data)
1331
{
1332
  REQ (EIO_CLOSE); req->int1 = fd; SEND;
1333
}
1334

    
1335
eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data)
1336
{
1337
  REQ (EIO_READAHEAD); req->int1 = fd; req->offs = offset; req->size = length; SEND;
1338
}
1339

    
1340
eio_req *eio_read (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1341
{
1342
  REQ (EIO_READ); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1343
}
1344

    
1345
eio_req *eio_write (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1346
{
1347
  REQ (EIO_WRITE); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1348
}
1349

    
1350
eio_req *eio_fstat (int fd, int pri, eio_cb cb, void *data)
1351
{
1352
  REQ (EIO_FSTAT); req->int1 = fd; SEND;
1353
}
1354

    
1355
eio_req *eio_futime (int fd, double atime, double mtime, int pri, eio_cb cb, void *data)
1356
{
1357
  REQ (EIO_FUTIME); req->int1 = fd; req->nv1 = atime; req->nv2 = mtime; SEND;
1358
}
1359

    
1360
eio_req *eio_ftruncate (int fd, off_t offset, int pri, eio_cb cb, void *data)
1361
{
1362
  REQ (EIO_FTRUNCATE); req->int1 = fd; req->offs = offset; SEND;
1363
}
1364

    
1365
eio_req *eio_fchmod (int fd, mode_t mode, int pri, eio_cb cb, void *data)
1366
{
1367
  REQ (EIO_FCHMOD); req->int1 = fd; req->int2 = (long)mode; SEND;
1368
}
1369

    
1370
eio_req *eio_fchown (int fd, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1371
{
1372
  REQ (EIO_FCHOWN); req->int1 = fd; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1373
}
1374

    
1375
eio_req *eio_dup2 (int fd, int fd2, int pri, eio_cb cb, void *data)
1376
{
1377
  REQ (EIO_DUP2); req->int1 = fd; req->int2 = fd2; SEND;
1378
}
1379

    
1380
eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, int pri, eio_cb cb, void *data)
1381
{
1382
  REQ (EIO_SENDFILE); req->int1 = out_fd; req->int2 = in_fd; req->offs = in_offset; req->size = length; SEND;
1383
}
1384

    
1385
eio_req *eio_open (const char *path, int flags, mode_t mode, int pri, eio_cb cb, void *data)
1386
{
1387
  REQ (EIO_OPEN); PATH; req->int1 = flags; req->int2 = (long)mode; SEND;
1388
}
1389

    
1390
eio_req *eio_utime (const char *path, double atime, double mtime, int pri, eio_cb cb, void *data)
1391
{
1392
  REQ (EIO_UTIME); PATH; req->nv1 = atime; req->nv2 = mtime; SEND;
1393
}
1394

    
1395
eio_req *eio_truncate (const char *path, off_t offset, int pri, eio_cb cb, void *data)
1396
{
1397
  REQ (EIO_TRUNCATE); PATH; req->offs = offset; SEND;
1398
}
1399

    
1400
eio_req *eio_chown (const char *path, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1401
{
1402
  REQ (EIO_CHOWN); PATH; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1403
}
1404

    
1405
eio_req *eio_chmod (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1406
{
1407
  REQ (EIO_CHMOD); PATH; req->int2 = (long)mode; SEND;
1408
}
1409

    
1410
eio_req *eio_mkdir (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1411
{
1412
  REQ (EIO_MKDIR); PATH; req->int2 = (long)mode; SEND;
1413
}
1414

    
1415
static eio_req *
1416
eio__1path (int type, const char *path, int pri, eio_cb cb, void *data)
1417
{
1418
  REQ (type); PATH; SEND;
1419
}
1420

    
1421
eio_req *eio_readlink (const char *path, int pri, eio_cb cb, void *data)
1422
{
1423
  return eio__1path (EIO_READLINK, path, pri, cb, data);
1424
}
1425

    
1426
eio_req *eio_stat (const char *path, int pri, eio_cb cb, void *data)
1427
{
1428
  return eio__1path (EIO_STAT, path, pri, cb, data);
1429
}
1430

    
1431
eio_req *eio_lstat (const char *path, int pri, eio_cb cb, void *data)
1432
{
1433
  return eio__1path (EIO_LSTAT, path, pri, cb, data);
1434
}
1435

    
1436
eio_req *eio_unlink (const char *path, int pri, eio_cb cb, void *data)
1437
{
1438
  return eio__1path (EIO_UNLINK, path, pri, cb, data);
1439
}
1440

    
1441
eio_req *eio_rmdir (const char *path, int pri, eio_cb cb, void *data)
1442
{
1443
  return eio__1path (EIO_RMDIR, path, pri, cb, data);
1444
}
1445

    
1446
eio_req *eio_readdir (const char *path, int pri, eio_cb cb, void *data)
1447
{
1448
  return eio__1path (EIO_READDIR, path, pri, cb, data);
1449
}
1450

    
1451
eio_req *eio_mknod (const char *path, mode_t mode, dev_t dev, int pri, eio_cb cb, void *data)
1452
{
1453
  REQ (EIO_MKNOD); PATH; req->int2 = (long)mode; req->int3 = (long)dev; SEND;
1454
}
1455

    
1456
static eio_req *
1457
eio__2path (int type, const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1458
{
1459
  REQ (type); PATH;
1460

    
1461
  req->flags |= EIO_FLAG_PTR2_FREE;
1462
  req->ptr2 = strdup (new_path);
1463
  if (!req->ptr2)
1464
    {
1465
      eio_api_destroy (req);
1466
      return 0;
1467
    }
1468

    
1469
  SEND;
1470
}
1471

    
1472
eio_req *eio_link (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1473
{
1474
  return eio__2path (EIO_LINK, path, new_path, pri, cb, data);
1475
}
1476

    
1477
eio_req *eio_symlink (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1478
{
1479
  return eio__2path (EIO_SYMLINK, path, new_path, pri, cb, data);
1480
}
1481

    
1482
eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1483
{
1484
  return eio__2path (EIO_RENAME, path, new_path, pri, cb, data);
1485
}
1486

    
1487
eio_req *eio_custom (eio_cb execute, int pri, eio_cb cb, void *data)
1488
{
1489
  REQ (EIO_CUSTOM); req->feed = (void (*)(eio_req *))execute; SEND;
1490
}
1491

    
1492
#endif
1493

    
1494
eio_req *eio_grp (eio_cb cb, void *data)
1495
{
1496
  const int pri = EIO_PRI_MAX;
1497

    
1498
  REQ (EIO_GROUP); SEND;
1499
}
1500

    
1501
#undef REQ
1502
#undef PATH
1503
#undef SEND
1504

    
1505
/*****************************************************************************/
1506
/* grp functions */
1507

    
1508
void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit)
1509
{
1510
  grp->int2 = limit;
1511
  grp->feed = feed;
1512

    
1513
  grp_try_feed (grp);
1514
}
1515

    
1516
void eio_grp_limit (eio_req *grp, int limit)
1517
{
1518
  grp->int2 = limit;
1519

    
1520
  grp_try_feed (grp);
1521
}
1522

    
1523
void eio_grp_add (eio_req *grp, eio_req *req)
1524
{
1525
  assert (("cannot add requests to IO::AIO::GRP after the group finished", grp->int1 != 2));
1526

    
1527
  grp->flags |= EIO_FLAG_GROUPADD;
1528

    
1529
  ++grp->size;
1530
  req->grp = grp;
1531

    
1532
  req->grp_prev = 0;
1533
  req->grp_next = grp->grp_first;
1534

    
1535
  if (grp->grp_first)
1536
    grp->grp_first->grp_prev = req;
1537

    
1538
  grp->grp_first = req;
1539
}
1540

    
1541
/*****************************************************************************/
1542
/* misc garbage */
1543

    
1544
ssize_t eio_sendfile_sync (int ofd, int ifd, off_t offset, size_t count)
1545
{
1546
  etp_worker wrk;
1547

    
1548
  wrk.dbuf = 0;
1549

    
1550
  eio__sendfile (ofd, ifd, offset, count, &wrk);
1551

    
1552
  if (wrk.dbuf)
1553
    free (wrk.dbuf);
1554
}
1555