srcs/toolbox/rb.c

00001 /* 
00002  * Copyright (c) 2005-2012 by KoanLogic s.r.l.
00003  */
00004 
00005 #include <sys/types.h>
00006 #include <stdlib.h>
00007 #include <string.h>
00008 #include <unistd.h>
00009 #include <u/toolbox/rb.h>
00010 #include <u/toolbox/misc.h>
00011 #include <u/toolbox/carpal.h>
00012 
00013 #if defined(U_RB_CAN_MMAP)
00014   #include <sys/mman.h>
00015 #if defined(HAVE_SYSCONF) && defined(_SC_PAGE_SIZE)
00016   #define u_vm_page_sz  sysconf(_SC_PAGE_SIZE)
00017 #elif defined (HAVE_SYSCONF) && defined(_SC_PAGESIZE)
00018   #define u_vm_page_sz  sysconf(_SC_PAGESIZE)
00019 #elif defined(HAVE_GETPAGESIZE)
00020   #define u_vm_page_sz  getpagesize()
00021 #else
00022   #error "don't know how to get page size.  reconfigure with --no_ringbuffer."
00023 #endif
00024 #endif  /* U_RB_CAN_MMAP */
00025 
00026 struct u_rb_s
00027 {
00028     char *base;     /* base address of the memory buffer. */
00029     size_t sz;      /* ring buffer size */
00030     size_t wr_off;  /* write offset */
00031     size_t rd_off;  /* read offset */
00032     size_t ready;   /* bytes ready to be read */
00033     int opts;       /* options */
00034 
00035     /* Implementation specific methods. */
00036     void (*cb_free) (struct u_rb_s *rb);
00037     ssize_t (*cb_read) (struct u_rb_s *rb, void *b, size_t b_sz);
00038     void *(*cb_fast_read) (struct u_rb_s *rb, size_t *pb_sz);
00039     ssize_t (*cb_write) (struct u_rb_s *rb, const void *b, size_t b_sz);
00040 };
00041 
00042 #define read_addr(rb)   (rb->base + rb->rd_off)
00043 #define write_addr(rb)  (rb->base + rb->wr_off)
00044 
00045 static void write_incr_contiguous (u_rb_t *rb, size_t cnt);
00046 static void write_incr_wrapped (u_rb_t *rb, size_t cnt);
00047 static void read_incr_wrapped (u_rb_t *rb, size_t cnt);
00048 static void read_incr_contiguous (u_rb_t *rb, size_t cnt);
00049 static ssize_t write_contiguous (u_rb_t *rb, const void *b, size_t b_sz);
00050 static ssize_t write_wrapped (u_rb_t *rb, const void *b, size_t b_sz);
00051 static ssize_t read_contiguous (u_rb_t *rb, void *b, size_t b_sz);
00052 static ssize_t read_wrapped (u_rb_t *rb, void *b, size_t b_sz);
00053 static void *fast_read (u_rb_t *rb, size_t *pb_sz);
00054 static int is_wrapped (u_rb_t *rb);
00055 static int mirror (u_rb_t *rb, const void *b, size_t to_be_written);
00056 
00057 #if defined(U_RB_CAN_MMAP)  /* specific to mmap(2) based implementation. */
00058   static int create_mmap (size_t hint_sz, int opts, u_rb_t **prb);
00059   static void free_mmap (u_rb_t *rb);
00060   static size_t round_sz (size_t sz);
00061 #endif  /* U_RB_CAN_MMAP */
00062 
00063 static int create_malloc (size_t sz, int opts, u_rb_t **prb);
00064 static void free_malloc (u_rb_t *rb);
00065 
00153 int u_rb_create (size_t hint_sz, int opts, u_rb_t **prb)
00154 {
00155     if (opts & U_RB_OPT_IMPL_MALLOC)
00156         return create_malloc(hint_sz, opts, prb);
00157 
00158 #if defined(U_RB_CAN_MMAP)
00159     /* Default is to use mmap implementation. */
00160     return create_mmap(hint_sz, opts, prb);
00161 #else   /* !U_RB_CAN_MMAP */
00162     /* Fallback to malloc in case mmap is not available. */
00163     return create_malloc(hint_sz, opts | U_RB_OPT_IMPL_MALLOC, prb);
00164 #endif  /* U_RB_CAN_MMAP */
00165 }
00166 
00176 void u_rb_free (u_rb_t *rb)
00177 {
00178     if (rb && rb->cb_free)
00179         rb->cb_free(rb);
00180     return;
00181 }
00182 
00193 size_t u_rb_size (u_rb_t *rb)
00194 {
00195     return rb->sz;
00196 }
00197 
00213 ssize_t u_rb_write (u_rb_t *rb, const void *b, size_t b_sz)
00214 {
00215     ssize_t rc;
00216 
00217     dbg_return_if (rb == NULL, -1);
00218     dbg_return_if (rb->cb_write == NULL, -1); 
00219 
00220     /* Also increment the number of ready bytes. */
00221     if ((rc = rb->cb_write(rb, b, b_sz)) >= 0)
00222         rb->ready += (size_t) rc;
00223 
00224     return rc;
00225 }
00226 
00244 ssize_t u_rb_read (u_rb_t *rb, void *b, size_t b_sz)
00245 {
00246     ssize_t rc;
00247 
00248     dbg_return_if (rb == NULL, -1);
00249     dbg_return_if (rb->cb_read == NULL, -1); 
00250 
00251     /* Decrement the number of ready bytes. */
00252     if ((rc = rb->cb_read(rb, b, b_sz)) >= 0)
00253         rb->ready -= (size_t) rc;
00254 
00255     return rc;
00256 }
00257 
00278 void *u_rb_fast_read (u_rb_t *rb, size_t *pb_sz)
00279 {
00280     dbg_return_if (rb == NULL, NULL);
00281     dbg_return_if (rb->cb_write == NULL, NULL);
00282 
00283     return rb->cb_fast_read(rb, pb_sz);
00284 }
00285 
00296 int u_rb_clear (u_rb_t *rb)
00297 {
00298     dbg_return_if (rb == NULL, -1);
00299     rb->wr_off = rb->rd_off = rb->ready = 0;
00300     return 0;
00301 }
00302  
00313 size_t u_rb_ready (u_rb_t *rb)
00314 {
00315     /* Let it crash on NULL 'rb's */
00316     return rb->ready;
00317 }
00318  
00330 size_t u_rb_avail (u_rb_t *rb)
00331 {
00332     return (rb->sz - rb->ready);
00333 }
00334 
00340 /* just shift the write pointer */
00341 static void write_incr_contiguous (u_rb_t *rb, size_t cnt)
00342 {
00343     rb->wr_off += cnt;
00344 }
00345 
00346 /* shift the write pointer */
00347 static void write_incr_wrapped (u_rb_t *rb, size_t cnt)
00348 {
00349     rb->wr_off = (rb->wr_off + cnt) % rb->sz;
00350 }
00351 
00352 /* shift the read pointer */
00353 static void read_incr_wrapped (u_rb_t *rb, size_t cnt)
00354 {
00355     rb->rd_off = (rb->rd_off + cnt) % rb->sz;
00356 }
00357 
00358 /* shift the read pointer of 'cnt' positions */
00359 static void read_incr_contiguous (u_rb_t *rb, size_t cnt)
00360 {
00361     /* 
00362      * assume 'rb' and 'cnt' sanitized 
00363      */
00364     rb->rd_off += cnt;
00365 
00366     /* When the read offset is advanced into the second virtual-memory region, 
00367      * both offsets - read and write - are decremented by the length of the 
00368      * underlying buffer */
00369     if (rb->rd_off >= rb->sz)
00370     {
00371         rb->rd_off -= rb->sz;
00372         rb->wr_off -= rb->sz;
00373     }
00374 
00375     return;
00376 }
00377 
00378 static void *fast_read (u_rb_t *rb, size_t *pb_sz)
00379 {
00380     void *data = NULL;
00381 
00382     dbg_return_if (rb == NULL, NULL);
00383     dbg_return_if (!(rb->opts & U_RB_OPT_USE_CONTIGUOUS_MEM), NULL);
00384     dbg_return_if (pb_sz == NULL, NULL);
00385     dbg_return_if (*pb_sz > u_rb_size(rb), NULL);
00386 
00387     /* if there is nothing ready to be read go out immediately */
00388     nop_goto_if (!(*pb_sz = U_MIN(u_rb_ready(rb), *pb_sz)), end);
00389 
00390     data = read_addr(rb);
00391     read_incr_contiguous(rb, *pb_sz);
00392     rb->ready -= *pb_sz;
00393 
00394     /* fall through */
00395 end:
00396     return data;
00397 }
00398 
00399 static int is_wrapped (u_rb_t *rb)
00400 {
00401     return (rb->rd_off > rb->wr_off);
00402 }
00403 
00404 static ssize_t write_contiguous (u_rb_t *rb, const void *b, size_t b_sz)
00405 {
00406     size_t to_be_written;
00407 
00408     dbg_return_if (rb == NULL, -1);
00409     dbg_return_if (b == NULL, -1);
00410     dbg_return_if (b_sz > u_rb_size(rb), -1);
00411 
00412     nop_goto_if (!(to_be_written = U_MIN(u_rb_avail(rb), b_sz)), end);
00413 
00414     memcpy(write_addr(rb), b, to_be_written);
00415 
00416     /* When using the malloc'd buffer, we need to take care of data mirroring
00417      * manually. */
00418     if (rb->opts & U_RB_OPT_IMPL_MALLOC)
00419         (void) mirror(rb, b, to_be_written);
00420     
00421     write_incr_contiguous(rb, to_be_written);
00422 
00423     /* fall through */
00424 end:
00425     return to_be_written;
00426 }
00427 
00428 static int mirror (u_rb_t *rb, const void *b, size_t to_be_written)
00429 {
00430     size_t right_sz, left_sz;
00431 
00432     if (rb->wr_off + to_be_written <= rb->sz)   /* Left. */
00433     {
00434         memcpy(write_addr(rb) + rb->sz, b, to_be_written);
00435     } 
00436     else if (rb->wr_off >= rb->sz)              /* Right. */
00437     {
00438         memcpy(write_addr(rb) - rb->sz, b, to_be_written); 
00439     } 
00440     else    /* !Right && !Left (i.e. Cross). */
00441     {
00442         left_sz = rb->sz - rb->wr_off;
00443         right_sz = to_be_written - left_sz;
00444 
00445         /* When the write has crossed the middle, we need to scatter the
00446          * left and right side of the buffer at the end of the second
00447          * half and at the beginning of the first half respectively. */
00448         memcpy(write_addr(rb) + rb->sz, b, left_sz);
00449         memcpy(rb->base, (const char *) b + left_sz, right_sz);
00450     }
00451 
00452     return 0;
00453 }
00454 
00455 static ssize_t write_wrapped (u_rb_t *rb, const void *b, size_t b_sz)
00456 {
00457     size_t to_be_written, rspace, rem;
00458 
00459     dbg_return_if (rb == NULL, -1);
00460     dbg_return_if (b == NULL, -1);
00461     dbg_return_if (b_sz > u_rb_size(rb), -1);
00462 
00463     nop_goto_if (!(to_be_written = U_MIN(u_rb_avail(rb), b_sz)), end);
00464 
00465     /* If rb has wrapped-around, or the requested amount of data
00466      * don't need to be fragmented between tail and head, append 
00467      * it at the write offset. */
00468     if (is_wrapped(rb) || (rspace = (rb->sz - rb->wr_off)) >= to_be_written)
00469     {
00470         memcpy(write_addr(rb), b, to_be_written);
00471     }
00472     else
00473     {
00474         rem = to_be_written - rspace;
00475         /* Handle head-tail fragmentation: the available buffer space on the
00476          * right side is not enough to hold the supplied data. */
00477         memcpy(write_addr(rb), b, rspace);
00478         memcpy(rb->base, (const char *) b + rspace, rem);
00479     }
00480 
00481     write_incr_wrapped(rb, to_be_written);
00482 
00483     /* fall through */
00484 end:
00485     return to_be_written;
00486 }
00487 
00488 static ssize_t read_wrapped (u_rb_t *rb, void *b, size_t b_sz)
00489 {
00490     size_t to_be_read, rspace, rem;
00491 
00492     dbg_return_if (b == NULL, -1);
00493     dbg_return_if (rb == NULL, -1);
00494     dbg_return_if (b_sz > u_rb_size(rb), -1);
00495 
00496     /* if there is nothing ready to be read go out immediately */
00497     nop_goto_if (!(to_be_read = U_MIN(u_rb_ready(rb), b_sz)), end);
00498 
00499     if (!is_wrapped(rb) || (rspace = (rb->sz - rb->rd_off)) <= to_be_read)
00500     {
00501         memcpy(b, read_addr(rb), to_be_read);
00502         read_incr_wrapped(rb, to_be_read);
00503     }
00504     else
00505     {
00506         rem = to_be_read - rspace;
00507         memcpy(b, read_addr(rb), rspace); 
00508         memcpy((char *) b + rspace, rb->base, rem);
00509         rb->rd_off = rem;
00510     }
00511 
00512     /* fall through */
00513 end:
00514     return to_be_read;
00515 }
00516 
00517 static ssize_t read_contiguous (u_rb_t *rb, void *b, size_t b_sz)
00518 {
00519     size_t to_be_read;
00520 
00521     dbg_return_if (b == NULL, -1);
00522     dbg_return_if (rb == NULL, -1);
00523     dbg_return_if (b_sz > u_rb_size(rb), -1);
00524 
00525     /* if there is nothing ready to be read go out immediately */
00526     nop_goto_if (!(to_be_read = U_MIN(u_rb_ready(rb), b_sz)), end);
00527 
00528     memcpy(b, read_addr(rb), to_be_read);
00529     read_incr_contiguous(rb, to_be_read);
00530 
00531     /* fall through */
00532 end:
00533     return to_be_read;
00534 }
00535 
00536 #if defined(U_RB_CAN_MMAP)
00537 
00538 static int create_mmap (size_t hint_sz, int opts, u_rb_t **prb)
00539 {
00540     int fd = -1;
00541     u_rb_t *rb = NULL;
00542     char path[] = "/tmp/rb-XXXXXX";
00543 
00544     dbg_err_sif ((rb = u_zalloc(sizeof(u_rb_t))) == NULL);
00545     dbg_err_sif ((fd = mkstemp(path)) == -1);
00546     dbg_err_sif (u_remove(path));
00547  
00548     /* round the supplied size to a page multiple (mmap is quite picky
00549      * about page boundary alignment) */
00550     rb->sz = round_sz(hint_sz);
00551     rb->wr_off = rb->rd_off = rb->ready = 0;
00552     rb->opts = opts;
00553 
00554     /* Set mmap methods.  Always use the contiguous methods, silently 
00555      * ignoring any contraddictory user request. */
00556     rb->cb_free = free_mmap;
00557     rb->cb_read = read_contiguous;
00558     rb->cb_fast_read = fast_read;
00559     rb->cb_write = write_contiguous;
00560 
00561     dbg_err_sif (ftruncate(fd, rb->sz) == -1);
00562 
00563     /* mmap 2*rb->sz bytes. this is just a commodity map that will be 
00564      * discarded by the two following "half" maps.  we use it just to let the 
00565      * system choose a suitable base address (one that we are sure it's 
00566      * a multiple of the page size) that we can safely reuse later on when
00567      * pretending to MAP_FIXED */
00568     rb->base = mmap(NULL, rb->sz << 1, PROT_NONE, 
00569             MAP_ANON | MAP_PRIVATE, -1, 0);
00570     dbg_err_sif (rb->base == MAP_FAILED);
00571 
00572     /* POSIX: "The mapping established by mmap() shall replace any previous 
00573      * mappings for those whole pages containing any part of the address space 
00574      * of the process starting at pa and continuing for len bytes."
00575      * So, next two mappings replace in-toto the first 'rb->base' map which
00576      * does not need to be explicitly munmap'd */
00577  
00578     /* first half of the mmap'd region.  use MAP_SHARED to "twin" the two
00579      * mmap'd regions: each byte stored at a given offset in the first half
00580      * will show up at the same offset in the second half and viceversa */
00581     dbg_err_sif (mmap(rb->base, rb->sz, PROT_READ | PROT_WRITE, 
00582             MAP_FIXED | MAP_SHARED, fd, 0) != rb->base);
00583   
00584     /* second half is first's twin: they are attached to the same file 
00585      * descriptor 'fd', hence their pairing is handled at the OS level */
00586     dbg_err_sif (mmap(rb->base + rb->sz, rb->sz, PROT_READ | PROT_WRITE,
00587             MAP_FIXED | MAP_SHARED, fd, 0) != rb->base + rb->sz);
00588  
00589     /* dispose the file descriptor */
00590     dbg_err_sif (close(fd) == -1);
00591 
00592     *prb = rb;
00593 
00594     return 0;
00595 err:
00596     u_rb_free(rb);
00597     U_CLOSE(fd);
00598     return -1;
00599 }
00600 
00601 static void free_mmap (u_rb_t *rb)
00602 {
00603     nop_return_if (rb == NULL, );
00604 
00605     /* "All pages containing a part of the indicated range are unmapped",
00606      * hence the following single munmap with a double length should be ok for 
00607      * both previous (contiguous) mmap's */
00608     dbg_return_sif (rb->base && (munmap(rb->base, rb->sz << 1) == -1), );
00609     u_free(rb);
00610 
00611     return;
00612 }
00613 
00614 /* round requested size to a multiple of PAGESIZE */
00615 static size_t round_sz (size_t sz)
00616 {
00617     size_t pg_sz = (size_t) u_vm_page_sz;
00618 
00619     return !sz ? pg_sz : (((sz - 1) / pg_sz) + 1) * pg_sz;
00620 }
00621 
00622 #endif  /* U_RB_CAN_MMAP */
00623 
00624 static int create_malloc (size_t sz, int opts, u_rb_t **prb)
00625 {
00626     size_t real_sz;
00627     u_rb_t *rb = NULL;
00628 
00629     dbg_return_if (sz == 0, -1);
00630     dbg_return_if (prb == NULL, -1);
00631 
00632     dbg_err_sif ((rb = u_zalloc(sizeof *rb)) == NULL);
00633 
00634     rb->opts = opts;
00635 
00636     /* Initialize counters. */
00637     rb->wr_off = rb->rd_off = rb->ready = 0;
00638     rb->sz = real_sz = sz;
00639 
00640     /* Double buffer size in case the user requested contiguous memory. */
00641     if (opts & U_RB_OPT_USE_CONTIGUOUS_MEM)
00642         real_sz <<= 1;
00643 
00644     /* Make room for the buffer. */
00645     dbg_err_sif ((rb->base = u_zalloc(real_sz)) == NULL);
00646 
00647     /* Set implementation specific callbacks. */
00648     rb->cb_free = free_malloc;
00649 
00650     if (opts & U_RB_OPT_USE_CONTIGUOUS_MEM) 
00651     {
00652         rb->cb_fast_read = fast_read;
00653         rb->cb_write = write_contiguous;
00654         rb->cb_read = read_contiguous;
00655     }
00656     else
00657     {
00658         rb->cb_fast_read = NULL;
00659         rb->cb_write = write_wrapped;
00660         rb->cb_read = read_wrapped;
00661     }
00662 
00663     *prb = rb;
00664 
00665     return 0;
00666 err:
00667     free_malloc(rb);
00668     return -1;
00669 }
00670 
00671 static void free_malloc (u_rb_t *rb)
00672 {
00673     nop_return_if (rb == NULL, );
00674 
00675     if (rb->base)
00676         u_free(rb->base);
00677     u_free(rb);
00678 
00679     return;
00680 }

←Products
© 2005-2012 - KoanLogic S.r.l. - All rights reserved