Copyright © 2012-2014 Georgia Tech Research Corporation
Copyright © 2015 Rice University
2015-02-02
Revision History | ||
---|---|---|
Revision 2.0.0 | 2015-02-02 | Revised by: NTD |
New Release with kernel channels and improved mDNS integration | ||
Revision 1.2.1 | 2014-04-07 | Revised by: NTD |
New Release with achd period, SSH example, ACH_OVERFLOW clarification | ||
Revision 1.2.0 | 2013-06-14 | Revised by: NTD |
New Release with ach_cancel, achcop, achlog | ||
Revision 1.1.0-1 | 2013-04-16 | Revised by: NTD |
Few fixes and add Xenomai note | ||
Revision 1.1.0 | 2013-02-12 | Revised by: NTD |
Initial Release |
This is the manual for the Ach IPC library.
Ach is an Inter-Process Communication (IPC) mechanism and library, intended for communication in real-time systems that sample data from physical processes. Ach eliminates the Head-of-Line Blocking problem for applications that always require access to the newest message. Source code is provided under the 2-clause BSD license.
Ach provides a message bus or publish-subscribe style of communication between multiple writers and multiple readers. A real-time system will generally have multiple Ach channels across which individual data samples are published. The messages sent on a channel are simple byte arrays, so arbitrary data may be transmitted such as text, images, and binary control messages. Each channel is implemented as two circular buffers, (1) a data buffer with variable sized entries and (2) an index buffer with fixed-size elements indicating the offsets into the data buffer. These two circular buffers are written in a channel-specific POSIX shared memory file. This frees users from managing synchronization, which is contained within the Ach library.
The Ach interface consists of the following procedures:
ach_create
Create the shared memory region and initialize its data structures
ach_open
Open the shared memory file and initialize process local channel counters
ach_put
Insert a new message into the channel
ach_get
Receive a message from the channel
ach_close
Close the shared memory file
Channels must be created before they can be opened. Creation may be done directly by either the reading or writing process, or it may be done via the shell command, ach mk channel_name, before the reader or writer start. After the channel is created, each reader or writer must open the channel before it can individually get or put messages.
The Ach API functions use return status codes to indicate either successful completion or an error condition. The following codes are defined:
ACH_OK
Operation completed successfully.
ACH_OVERFLOW
Destination was too small. If returned from
ach_put
, the channel is too small to hold
the message you attempted to send. If returned from
ach_get
, the buffer you passed to receive
the message was too small to hold the result. If the channel is
too small, you should specify a larger nominal message size when
creating the channel. If the buffer is to small, pass in a
bigger buffer.
ACH_INVALID_NAME
An invalid channel name.
ACH_BAD_SHM_FILE
An invalid channel file.
ACH_FAILED_SYSCALL
A system call was unsuccessful. Check the
errno
to determine what went wrong.
ACH_STALE_FRAMES
No new data has be published to channel.
ACH_MISSED_FRAME
The receiver has skipped over some frames (which may no longer be stored in the channel). You will still get either the latest or the oldest frame, depending on which you requested.
ACH_TIMEOUT
Timeout occurred. No data received.
ACH_CANCELED
Message wait was canceled.
ACH_EEXIST
Channel already exists.
ACH_ENOENT
Channel does not exist.
ACH_EACCES
No permission to access the channel file. You will want to either run as the appropriate user or set the channel permissions appropriately.
ACH_EINVAL
An invalid parameter was passed.
ACH_CORRUPT
Corruption of the channel file detected.
ACH_BAD_HEADER
Invalid header line in the network protocol.
ACH_FAULT
Memory fault in a system call.
ACH_EINTR
System call interrupted. This code is only used internally and not returned to library clients.
ACH_BUG
An unexpected or inconsistent condition has occurred. If the software is correct, you should never see this code. If you do see this code, please contact the authors.
You can create a channel by directly calling the
ach_create
; however, it is generally preferable to
call the ach (see also) from the shell as this is less
likely to result in races between multiple processes needing to
access the channel.
enum ach_status ach_create
(const char *channel_name, size_t frame_cnt, size_t frame_size, ach_create_attr_t *attr);
Channels are created using the ach_create
function. A channel must be created before it can be opened.
This function will create and initialize a POSIX shared memory
file for the channel. The channel can then be opened by passing
the same channel_name
to
ach_open
.
Example 1. Creating a Channel
enum ach_status r = ach_create( "my_channel", 10, 512, NULL ); if( ACH_OK != r ) { fprintf( stderr, "Could not create channel: %s\n", ach_result_to_string(r) ); exit(EXIT_FAILURE); }
Note that the message size given here is not a strict constraint. Individual messages are allowed to be smaller or larger than this value. The only hard constraint is that a total of frame_cnt*frame_size is allocated for all message data in the channel. Thus, the total data required by all buffered messages cannot exceed this value (older messages are overwritten), and no individual message may be larger than this value.
Opens a channel for use within this process. The channel must be opened before messages can be sent or received
The ach_put
function writes
a new message to the channel. This will go in the next open space
in the circular array. If there is insufficient unused space in
the circular array, then the oldest entry or entries will be
overwritten. If the entire circular array is too small for the
new message, the message is not written and
ACH_OVERFLOW
is returned.
enum ach_status ach_get
(ach_channel_t *channel, void *buf, size_t buf_size, size_t *frame_size, const struct timespec * restrict abstime, int options);
The ach_get
is used to received data
from a channel. The options
parameter controls
the behavior of this function and is the bitwise or of the
following values.
ACH_O_NONBLOCK
Return immediately with ACH_STALE_FRAMES if no new data is present in the channel. Exclusive with ACH_O_WAIT.
ACH_O_WAIT
Wait until a new message is posted. Default behavior without this flag is to return immediately. Exclusive with ACH_O_NONBLOCK.
ACH_O_FIRST
Return the oldest message in the channel. Exclusive with ACH_O_LAST.
ACH_O_LAST
Return the newest message. Exclusive with ACH_O_FIRST.
ACH_O_ABSTIME
Timeout is an absolute time. Exclusive with ACH_O_RELTIME.
ACH_O_RELTIME
Timeout is a relative time. Exclusive with ACH_O_ABSTIME.
ACH_O_COPY
Copy the message to the buffer, even if we have
already seen it. Only valid when
ACH_O_WAIT
is not given. Default behavior
without this flag is to return
ACH_STALE_FRAMES
when there are no new
messages.
Example 4. Poll for oldest message
This will get the next unseen message from the channel.
size_t frame_size; enum ach_status r; r = ach_get( &channel, &my_msg, my_msg_size, &frame_size, NULL, ACH_O_NONBLOCK | ACH_O_FIRST ); if( ACH_MISSED_FRAME == r ) { printf("Missed a/some messages(s)\n"); } else if( ACH_STALE_FRAMES == r ) { printf("No new data\n"); } else if( ACH_OK != r ) { syslog( LOG_ERR, "Unable to get a message: %s\n", ach_result_to_string(r) ); } else if( frame_size != sizeof(my_msg) ) { syslog( LOG_WARNING, "Unexpected message size %" PRIuPTR ", expecting %" PRIuPTR "\n", frame_size, sizeof(my_msg) ); }
Example 5. Wait for newest message
This will get the newest message from the channel. If no unseen messages are in the channel, it will wait forever until one is posted.
enum ach_status r = ach_get( &channel, &my_msg, my_msg_size, &frame_size, NULL, ACH_O_WAIT | ACH_O_LAST );
Example 6. Timed wait for newest message
This will get the newest message from the channel. If no unseen messages are in the channel, it will up to one second until one is posted.
struct timespec t; clockid_t clock; enum ach_status r; /* Get the channel clock */ r = ach_channel_clock(&channel,&clock); if( ACH_OK != r ) { fprintf(stderr, "Could not get channel clock: %s\n", ach_result_to_string(r)); exit(EXIT_FAILURE); } /* Get current time and set the timeout. */ clock_gettime( clock, &t ); t.tv_sec += 1; /* Get the message */ enum ach_status r = ach_get( &channel, &my_msg, my_msg_size, &frame_size, &t, ACH_O_WAIT | ACH_O_LAST | ACH_O_ABSTIME ); if( ACH_TIMEOUT == r ) { fprintf(stdout, "call to ach_get timed out\n"); }
Does anybody know what time it is? | |
---|---|
While ach channels default to using
|
To interrupt an in-progress call to ach_get
that has blocked waiting for a new message, use the
ach_cancel
function. This will cause
ach_get
to return
ACH_CANCELED
.
Example 7. Cancel waits upon a signal
This will install a signal handler to cancel waits when a signal is received.
ach_channel_t channel; ... static void sighandler_cancel ( int sig ) { enum ach_status r = ach_cancel(&channel, NULL); if( ACH_OK != r ) exit(EXIT_FAILURE); } ... void setup_sighandler_cancel ( int sig ) { struct sigaction act; memset(&act, 0, sizeof(act)); act.sa_handler = &sighandler_cancel; if( sigaction(sig, &act, NULL) ) exit(EXIT_FAILURE); }
Example 8. Cancel waits in another thread
This will notify waits in another thread to cancel.
ach_channel_t channel; ... ach_cancel_attr_t attr; ach_cancel_attr_init(&attr); attr.async_unsafe = 1; /* permit functions that are unsafe in signal handlers */ enum ach_status r = ach_cancel(&channel, &attr); if( ACH_OK != r ) exit(EXIT_FAILURE);
Set the POSIX permission bits of the channel to
mode
. Note that any channel access requires
both the read (4) and write (2) bits to be set, because we must
write in order to hold the mutex. The executable bit (1) is
irrelevant.
Ach includes bindings for Common Lisp using CFFI. They can be loaded via ASDF.
Ach includes bindings for Python using a C extension module and wrapper class.
The module uses Python's Buffer Protocol to
obtain a serialized representation for Python objects. This
will work with string
,
bytearray
,
ctypes.Structure
and other types which
implement the Buffer Protocol.
The ach command allows creation and deletion of channels from the shell.
ach {mk | rm | chmod | dump | file} [octal_mode] {chanel_name} [-o octal_mode] [-m frame_count] [-n frame_size] [-t] [-v] [-V] [-?]
Example 12. Create a channel
Create channel named "my_channel" with slots for 10 messages which have a nominal size of 64 bytes.
ach mk my_channel -m 10 -n 64
Example 14. Create a world-accessible channel
Create channel named "my_channel" which is world-accessible (permisions 666).
ach mk my_channel -m 10 -n 64 -o 666
Ach includes a Linux kernel module which implements channels in
kernel-space. This allows library clients to wait for messages
on multiple channels (and in combination with any other file
descriptors) using the poll()
or
select()
family of functions.
The most convenient way to build the kernel module will probably be with DKMS (Dynamic Kernel Module System). DKMS enables building and rebuilding of out-of-tree kernel modules (such as ach, device drivers, etc.) when the kernel is upgraded. On Debian and Ubuntu, the package manager automatically invokes DKMS to rebuild the registered modules. To setup ach for DKMS, call configure with ./configure --enable-dkms-build, and the module sources for will be installed and built when you call make install.
Alternatively, you can build the module in the ach source tree directly by calling ./configure --enable-kbuild --disable-dkms. If your kernel headers are not in the typical location and/or do not match the currently running kernel, you may to specify the kernel release and header directory.
Example 18. Manually specifying kernel information for kbuild
./configure --enable-kbuild \ --disable-dkms \ KERNELRELEASE=3.13.0-44-lowlatency \ KDIR=/usr/src/linux-headers-3.13.0-44-lowlatency make sudo make install
After the kernel module is build and installed, you need to link
it into the kernel. This is done with the
modprobe command. The kernel module can be
removed with rmmod. The module takes one
optional parameter, max_devices
which
specifies the maximum number of kernel channels that can be
created.
Kernel channels can be created using the ach
command by passing the -k
flag.
By default, kernel channels can only be created and accessed by root. However, you can use udev to control kernel channel permissions. The following example will set the group ownership of all ach devices to the ach group. The file needs to be placed in /etc/udev/rules.d/.
Example 24. udev rules file to set channel group
# /etc/udev/rules.d/10-ach.rules # Control permissions for ach character devices # Set the permissions of the ach control device KERNEL=="achctrl", GROUP=="ach", MODE=="660" # Set the permissions of ach channel devices KERNEL=="ach-*", GROUP=="ach", MODE=="660" # Create symbolic links for ach channel devices KERNEL=="ach-*", SYMLINK+="ach/%k"
Kernel channels support waiting for multiple events using the
poll
, select
and
similar functions. These functions require the file descriptor
for the channel.
Example 25. Obtaining the channel file descriptor
ach_channel_t channel; int channel_fd; enum ach_status r; /* Open channel */ /* ... */ /* Get Channel File Descriptor */ r = ach_channel_fd( &channel, &channel_fd ); if( ACH_OK != r ) { fprintf(stderr, "could not get file descriptor for channel '%s': %s\n", names[i], ach_result_to_string(r)); exit(EXIT_FAILURE); }
The channel file descriptor can then be passed to
poll
, select
, etc. to
determine when there is new data to read. Note that channels
are always ready to write.
Example 26. Complete poll() example
#include <stdlib.h> #include <pthread.h> #include <inttypes.h> #include <ach.h> #include <stdio.h> #include <poll.h> #include <unistd.h> int main(int argc, char **argv) { const char *names[] = {"channel-0", "channel-1"}; const size_t n = sizeof(names) / sizeof(names[0]); ach_channel_t channel[n]; struct pollfd pfd[n]; for( size_t i = 0; i < n; i ++ ) { /* Open Channel */ enum ach_status r = ach_open( &channel[i], names[i], NULL ); if( ACH_OK != r ) { fprintf(stderr, "could not open channel '%s': %s\n", names[i], ach_result_to_string(r)); exit(EXIT_FAILURE); } /* Get Channel File Descriptor */ r = ach_channel_fd( &channel[i], &pfd[i].fd ); if( ACH_OK != r ) { fprintf(stderr, "could not get file descriptor for channel '%s': %s\n", names[i], ach_result_to_string(r)); exit(EXIT_FAILURE); } /* Set events to poll for */ pfd[i].events = POLLIN; } /* read forever */ for(;;) { /* poll for new messages */ int r_poll = poll( pfd, n, -1 ); if( r_poll < 0 ) { perror("poll"); exit(EXIT_FAILURE); } /* find channels with new data */ for( size_t i = 0; i < n && r_poll > 0; i++ ) { if( (pfd[i].revents & POLLIN) ) { /* There's new data on this channel */ char buf[512]; size_t frame_size; enum ach_status r = ach_get( &channel[i], buf, sizeof(buf), &frame_size, NULL, ACH_O_NONBLOCK | ACH_O_FIRST ); switch(r) { case ACH_OK: case ACH_MISSED_FRAME: { /* this example just writes it to stdout */ ssize_t wr = write( STDOUT_FILENO, buf, frame_size ); if( wr < 0 ) { perror("write"); exit(EXIT_FAILURE); } break; } default: fprintf( stderr, "Error getting data from '%s': %s\n", names[i], ach_result_to_string(r)); exit(EXIT_FAILURE); } r_poll--; } } } return 0; }
The |
While the primary design goal of Ach is low-latency, single host IPC, the achd daemon additionally provides a way to relay messages over the network.
achd serve
The achd server should be setup to run from the inetd super-server. Some distributions use the alternative xinetd, which is configured differently.
Inetd and xinetd work by listening for incoming connections on all the ports specified in their configuration. When (x)inetd recieves a new connection, it forks off the process as given in the configuration file, (in this case achd), and hooks the standard input and output of that process to the socket connection. |
On Debian-based systems (including Ubuntu and Mint), you can use the openbsd-inetd package, installed with apt-get install openbsd-inetd. Alternatively, you can install xinetd with with apt-get install xinetd. |
You should add the following line to /etc/inetd.conf, assuming you have installed ach under /usr/local/:
#/etc/inetd.conf 8076 stream tcp nowait nobody /usr/local/bin/achd /usr/local/bin/achd serve
On Debian-based systems where you have installed ach via the package manger, you can enable achd in inetd by running dpkg-reconfigure ach-utils. |
You will probably need to restart inetd after editing the configuration file. How to do this depends on the init system used by your operating system and the particular inetd you are using. Here are some options which may work:
/etc/init.d/inetd restart
service openbsd-inetd restart
If xinetd is run with the with the inetd_compat flag, then you can use a similar configuration to the traditional inetd.
#/etc/inetd.conf achd stream tcp nowait nobody /usr/local/bin/achd /usr/local/bin/achd serve
Otherwise, you can add the following stanza to your xinetd configuration. Typically, this would go in the per-service file /etc/xinetd.d/achd.
#/etc/xinetd.d/achd service achd { flags = REUSE socket_type = stream protocol = tcp port = 8076 wait = no user = nobody server = /usr/local/bin/achd server_args = serve disable = no }
In addition, you will need to list the service in /etc/services.
#/etc/services achd 8076/tcp
Then, tell xinetd to reload its configuration
/etc/init.d/xinetd restart
service xinetd reload
Now, you can test this setup by telnetting to port 8076 on the server: telnet SERVER 8076. If you type in some gibberish (e.g. asdf) followed by a newline, then you should see something like this:
Escape character is '^]'. asdf status: 14 # ACH_BAD_HEADER message: malformed header . Connection closed by foreign host.
Which will mean that achd is properly setup.
If achd is not operating properly, check the log files on your machine for more information. The log output will typically be stored in a file such as /var/log/daemon.log, though this depends on how syslog is configured on your system. You can also add the -vv flags to both server achd (in inetd.conf) the client achd commands to enable debugging output. Syslog may store the debug output in /var/log/debug.log. |
For the achd server to access any channel, it must be readable and writable by the user under which achd runs ("nobody" in the above example line for inetd.conf). If you trust all users on the local machine, you can set the channel permissions to "666", but please note that this is not a secure configuration. An alternative is to create a separate user account for achd and ensure that channels are readable and writable by that account or one of its groups. |
Currently, achd performs no authentication, access control, or encryption (this may be added in the future). If security is a concern in your application, it must currently be addressed at the network level, e.g. by physical isolation, firewalling, a VPN such as IPSec, and/or SSH tunnelling. |
From the client machine, you can run achd to push or pull message to or from the server.
achd {push | pull} {hostname} {chanel_name} [-t tcp|udp] [-p port] [-z remote_channel_name] [-u microseconds] [-d] [-r] [-q] [-v] [-V] [-?]
Example 28. Push channel to server and retry dropped connections
achd -r pull server_name channel_name
To add both encryption and compression, you can also forward achd over SSH. This will tunnel the achd TCP connection through the encrypted and compressed SSH connection.
Multicast DNS (mDNS) can be used to browse the Ach channels on the local network and to lookup the hostname for an channel. mDNS names are generally in the .local domain To advertise an Ach channel, it is registered as a service with the local mDNS server.
Example 35. Lookup channel origin in achd client
achd pull channel_name
This performs an mDNS query to determine the origin of the specified channel, then connects to that host.
mDNS is a peer-to-peer variation on DNS that reduces the need for prior configuration by automatically discovering services on the local network. Each peer runs an mDNS server, and services, e.g., file shares, printers, Ach channels, are located by sending a multicast query. Popular implementations of mDNS are Bonjour and Avahi. Some communication systems, which were generally developed prior to mDNS, use specialized naming services. For example, ONC RPC uses the port mapper and CORBA has its own naming service. |
The achcop program can be used to start daemons, and restart them if they die.
achcop [-P cop-pid-file] [-p child-pid-file] [-e stderr-file] [-o stdout-file] [-d] [-r] [-v] [-V] [-?] -- {child-program} [child-arguments...]
Detach process from terminal and run in background (daemonize), -d flag.
Lock PID files and write PIDs, -P and -p flags.
Reconnect if connection is broken, -r flag.
Redirect standard output and standard error to files, -o and -e flags.
Restart process if SIGHUP
is received
Signals are how POSIX user programs are
notified of asynchrous events. They are similar in spirit to
low-level interrupts and interrupt service routines. When a
process receives a signal, its execution is interrupted, and
it runs the specified handler for that signal. For example,
when you hit Control-C at a terminal,
the process in the forground receives the interrupt signal,
|
Achcop will respond to the following
signals. You can send a signal with the kill
shell command or the kill
C function.
SIGTERM
, SIGINT
Achcop sends SIGTERM to the child, waits for child to exit, and return child's exit status
SIGHUP
Achcop sends SIGTERM to the child, waits for child to exit, and restarts the child
SIGUSR1
Indicates that the child has started successfully. If the child does not send SIGUSR1 and continues running past an initial timeout, achcop assumes it started successfully.
SIGUSR2
Indicates that the child has failed to start properly. If the child fails on its first attempt and before a timeout has elapsed, it will not be restarted. If your child does not send this signal, achcop may wait a few seconds to ensure that the child runs successfully.
The following shell lines show how one might run achcop.
The following C code will notify a parent achcop of your child's success or failure. This is an optional step. Achcop will operate properly without these signals; however, providing the signals means achcop does not need to wait for a brief timeout to ensure that your child continues operating.
Example 39. Signal achcop of child success or failure
/* only send signal if running under achcop */ if( my_parent_is_achcop ) { int sig; /* determine signal to send */ if( initialized_correctly ) { sig = SIGUSR1; /* success */ } else { sig = SIGUSR2; /* failure */ } /* send the signal */ int r = kill( getppid(), sig ); /* check that we sent the signal */ if( r ) { perror("Couldn't signal parent"); exit(EXIT_FAILURE); } }
Note that the default signal handlers for
SIGUSR1
and SIGUSER2
terminate the process; therefore, if you send these signals to
your shell, your shell will terminate.
The achlog program can be used to log messages from an ach channel to disk.
achlog [-z] [-V] [-?] {channels...}
The Log format is similar to the TCP network protocol for achpipe. The log files begins with a sequence of ASCII key-value headers, followed by a single "." character, followed by a sequence of messages.
Example 40. achlog headers
ACHLOG channel-name: foo log-version: 0 log-time-ach: 6226133.801454417 log-time-real: 1371238788.956337009 # Fri Jun 14 15:39:48 2013 local-host: daneel user: ntd # Neil T. Dantam .
Messages are framed in the file with the following C struct:
struct { uint8_t reserved[8]; /* reserved for future use */ uint8_t size_bytes[8]; /* size of data, stored little endian */ uint8_t data[1]; /* flexible array containing size_bytes of data */ };
The performance of your real-time application will be highly dependent on your underlying hardware and operating system. It is quite likely than any off-the-shelf hardware or operating system you use will be tuned for something other than low-latency real-time operation, e.g. maximum thoughput or minimum power consumption. However, there are several paremeters to adjust to improve your performance.
Many current CPUs will lower their frequency when idle in order to reduce consumption. This is great for extending your laptop's battery life but poor for minimizing latency. On an x86/amd64 PC, you can likely disable frequency scaling from the BIOS, which will reduce your system's latency. In the author's experience, simply changing to a "performance" CPU governer from within the operating system did NOT provide the same latency reduction; you need to disable freqency scaling from the BIOS.
General purpose operating systems (e.g. GNU/Linux, FreeBSD) are designed to maximize throughput and perhaps to reduce latency to "human-tolerable" (0.1s) levels. The worst-case performance may be sacrificed in order to improve average-case performance. For a real-time application, it is generally worst-case performance that matters rather than average case, and latency is more important than throughput.
If using a Linux kernel, you can apply the PREEMPT_RT patch to allow a fully-preemptible kernel. Without this, a low priority task in the middle of a system call may block a high priority task from running until the system call completes. With a fully-preemptible kernel, the high priority task gets to run immediately.
Xenomai is a dual-kernel real-time operating system. A hard real-time kernel, Adeos, handles interrupts and a full Linux kernel runs along-side, permitting normal Linux processes to run directly. Real-time processes must use a specific Xenomai API skin. One of these skins is POSIX, and this POSIX interface is sufficient to use Ach for real-time Xenomai processes.
There is one important caveat when using Ach on Xenomai. Because of the dual-kernel approach, Xenomai scheduling primitives -- mutexes and condition variables -- are different from the Linux primitives; a Linux process and a Xenomai process cannot synchronize on the same condition variable. Therefore, Ach compiled for Xenomai cannot directly share channels with Ach compiled for Linux. If you want a Linux process and a Xenomai process on the same machine to communicate via Ach, you can use achd to forward the messages between the two sides. This issue does not affect messages forwarded between different machines. |
While Ach development has focused on Linux, it should be portable to other operating systems which provide the POSIX real-time extensions. In particular, Ach depends on POSIX shared memory and process-shared mutexes and condition variables. Contributions to increase portability are extremely welcome.
You can use the included achbench command to
test performance on your system while you try different
configurations. This program will fork
a
specified number of publishers and subscribers and measure the
latency of message passing.
POSIX provides a rich variety of IPC mechanisms, but none of them are ideal for real-time control. The fundamental difference is that as soon as a new sample of the signal (in the oscilloscope sense, not a POSIX signal) is produced, nearly everything no longer cares about older samples. Thus, we want to always favor new data over old data whereas general-purpose POSIX IPC favors the old data. This problem is typically referred to as Head of Line (HOL) Blocking. The exception to this is POSIX shared memory. However, synchronization of shared memory is a difficult programming problem, making the typical and direct use of POSIX shared memory unfavorable for developing robust systems. Furthermore, some parts of the system, such as logging, may need to access older samples, so this also should be permitted at least on a best-effort basis.
The core data structure of an Ach channel is a pair of circular arrays located in the POSIX shared memory file. There is a data array and an index array. The data array contains variable sized entries which store the actual message frames sent through the Ach channel. The index array contains fixed size entries where each entry has both an offset into the data array and the length of that data entry. A head offset into each array indicates both the place to insert the next data and the location of the most recent message frame. This pair of circular arrays allows us to find the variable sized message frames by first looking at a known offset in the fixed-sized index array.
Access to the channel is synchronized using a mutex and condition variable. This allows readers to either periodically poll the channel for new data or to wait on the condition variable until a writer has posted a new message. Using a read/write lock instead would have allowed only polling. Additionally, synchronization using a mutex prevents starvation and enables proper priority inheritance between processes, important to maintaining real-time performance.
Two procedures compose the core of ach:
ach_put
and ach_get
.
The procedure ach_put
inserts new messages
into the channel. Its function is analogous to
write
, sendmsg
, and
mq_send
. The procedure is given a pointer
to the shared memory region for the channel and a byte array
containing the message to post. There are four broad steps to
the procedure:
Get an index entry. If there is at least one free index entry, use it. Otherwise, clear the oldest index entry and its corresponding message in the data array.
Make room in the data array. If there is enough room already, continue. Otherwise, repeatedly free the oldest message until there is enough room.
Copy the message into data array.
Update the offset and free counts in the channel structure.
The procedure ach_get
receives a message
from the channel. Its function is analogous to
read
, recvmsg
, and
mq_receive
. The procedure takes a pointer
to the shared memory region, a storage buffer to copy the
message to, the last message sequence number received, the next
index offset to check for a message, and option flags indicating
whether to block waiting for a new message and whether to return
the newest message bypassing any older unseen messages.
If we are to wait for a new message and there is no new message, then wait. Otherwise, if there are no new messages, return a status code indicating this fact.
Find the index entry to use. If we are to return the newest message, use that entry. Otherwise, if the next entry we expected to use contains the next sequence number we expect to see, use that entry. Otherwise, use the oldest entry.
According to the offset and size from the selected index entry, copy the message from the data array into the provided storage buffer.
Update the sequence number count and next index entry offset for this receiver.
Ach is orthogonal to the issue of serialization: it only handles raw byte arrays. Thus, you can choose whatever type of serialization is most appropriate for you application. For maximum performance, it may be appropriate to define message types as simple C structures. If more flexibility is required, there are a variety of other options to choose from: XDR, ASN.1, Protocol Buffers, etc.
This manual is written in Docbook XML.
Redistribution and use in source (XML DocBook) and 'compiled' forms (SGML, HTML, PDF, PostScript, RTF and so forth) with or without modification, are permitted provided that the following conditions are met:
Redistributions of source code (XML DocBook) must retain the above copyright notice, this list of conditions and the following disclaimer as the first lines of this file unmodified.
Redistributions in compiled form (transformed to other DTDs, converted to PDF, PostScript, RTF and other formats) must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
THIS DOCUMENTATION IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS DOCUMENTATION, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.