Ach IPC User Manual

Neil T Dantam

2015-02-02

Revision History
Revision 2.0.02015-02-02Revised by: NTD
New Release with kernel channels and improved mDNS integration
Revision 1.2.12014-04-07Revised by: NTD
New Release with achd period, SSH example, ACH_OVERFLOW clarification
Revision 1.2.02013-06-14Revised by: NTD
New Release with ach_cancel, achcop, achlog
Revision 1.1.0-12013-04-16Revised by: NTD
Few fixes and add Xenomai note
Revision 1.1.02013-02-12Revised by: NTD
Initial Release

This is the manual for the Ach IPC library.

Ach is an Inter-Process Communication (IPC) mechanism and library, intended for communication in real-time systems that sample data from physical processes. Ach eliminates the Head-of-Line Blocking problem for applications that always require access to the newest message. Source code is provided under the 2-clause BSD license.


Table of Contents
1. Introduction
2. Libach API
2.1. Status Codes
2.2. Creating a channel
2.3. Opening a channel
2.4. Sending Data
2.5. Receiving Data
2.6. Cancel message wait
2.7. Closing a channel
2.8. Deleting a channel
2.9. Channel Permissions
2.10. Ignore old messages
3. Language Bindings
3.1. Common Lisp
3.2. Python
4. ach Shell Tool
5. Linux Kernel Module
5.1. Kernel Module Installation
5.2. Creating Kernel Channels
5.3. Kernel Channel Permissions
5.4. Waiting on Multiple Kernel Channels
6. achd Network Daemon
6.1. Server
6.1.1. Traditional Inetd Configuration
6.1.2. Xinetd Configuration
6.1.3. Testing Server Configuration
6.2. Client
6.3. Locating Channels via mDNS
7. achcop Watchdog Utility
7.1. Features
7.2. Signals
7.3. Examples
7.3.1. Shell Commands
7.3.2. Signaling Achcop from Child
8. achlog Logging Utility
8.1. Log Format
8.2. Usage
9. Performance Tuning
9.1. Disable CPU frequency scaling
9.2. Use a Real-Time Operating System
9.2.1. Linux PREEMPT_RT
9.2.2. Linux / Xenomai
9.2.3. Other Real-Time Operating Systems
9.3. Benchmark your system
10. Theory of Operation
10.1. Data Structure
10.2. Core Procedures
10.2.1. ach_put
10.2.2. ach_get
10.3. Serialization
Colophon

1. Introduction

Ach provides a message bus or publish-subscribe style of communication between multiple writers and multiple readers. A real-time system will generally have multiple Ach channels across which individual data samples are published. The messages sent on a channel are simple byte arrays, so arbitrary data may be transmitted such as text, images, and binary control messages. Each channel is implemented as two circular buffers, (1) a data buffer with variable sized entries and (2) an index buffer with fixed-size elements indicating the offsets into the data buffer. These two circular buffers are written in a channel-specific POSIX shared memory file. This frees users from managing synchronization, which is contained within the Ach library.

The Ach interface consists of the following procedures:

ach_create

Create the shared memory region and initialize its data structures

ach_open

Open the shared memory file and initialize process local channel counters

ach_put

Insert a new message into the channel

ach_get

Receive a message from the channel

ach_close

Close the shared memory file

Channels must be created before they can be opened. Creation may be done directly by either the reading or writing process, or it may be done via the shell command, ach mk channel_name, before the reader or writer start. After the channel is created, each reader or writer must open the channel before it can individually get or put messages.


2. Libach API

2.1. Status Codes

The Ach API functions use return status codes to indicate either successful completion or an error condition. The following codes are defined:

ACH_OK

Operation completed successfully.

ACH_OVERFLOW

Destination was too small. If returned from ach_put, the channel is too small to hold the message you attempted to send. If returned from ach_get, the buffer you passed to receive the message was too small to hold the result. If the channel is too small, you should specify a larger nominal message size when creating the channel. If the buffer is to small, pass in a bigger buffer.

ACH_INVALID_NAME

An invalid channel name.

ACH_BAD_SHM_FILE

An invalid channel file.

ACH_FAILED_SYSCALL

A system call was unsuccessful. Check the errno to determine what went wrong.

ACH_STALE_FRAMES

No new data has be published to channel.

ACH_MISSED_FRAME

The receiver has skipped over some frames (which may no longer be stored in the channel). You will still get either the latest or the oldest frame, depending on which you requested.

ACH_TIMEOUT

Timeout occurred. No data received.

ACH_CANCELED

Message wait was canceled.

ACH_EEXIST

Channel already exists.

ACH_ENOENT

Channel does not exist.

ACH_EACCES

No permission to access the channel file. You will want to either run as the appropriate user or set the channel permissions appropriately.

ACH_EINVAL

An invalid parameter was passed.

ACH_CORRUPT

Corruption of the channel file detected.

ACH_BAD_HEADER

Invalid header line in the network protocol.

ACH_FAULT

Memory fault in a system call.

ACH_EINTR

System call interrupted. This code is only used internally and not returned to library clients.

ACH_BUG

An unexpected or inconsistent condition has occurred. If the software is correct, you should never see this code. If you do see this code, please contact the authors.


2.2. Creating a channel

You can create a channel by directly calling the ach_create; however, it is generally preferable to call the ach (see also) from the shell as this is less likely to result in races between multiple processes needing to access the channel.

enum ach_status ach_create(const char *channel_name, size_t frame_cnt, size_t frame_size, ach_create_attr_t *attr);

Channels are created using the ach_create function. A channel must be created before it can be opened. This function will create and initialize a POSIX shared memory file for the channel. The channel can then be opened by passing the same channel_name to ach_open.

Example 1. Creating a Channel


enum ach_status r = ach_create( "my_channel", 10, 512, NULL );
if( ACH_OK != r ) {
    fprintf( stderr, "Could not create channel: %s\n", ach_result_to_string(r) );
    exit(EXIT_FAILURE);
}
      

Note that the message size given here is not a strict constraint. Individual messages are allowed to be smaller or larger than this value. The only hard constraint is that a total of frame_cnt*frame_size is allocated for all message data in the channel. Thus, the total data required by all buffered messages cannot exceed this value (older messages are overwritten), and no individual message may be larger than this value.


2.3. Opening a channel

enum ach_status ach_open(ach_channel_t *channel, const char *channel_name, ach__attr_t *attr);

Opens a channel for use within this process. The channel must be opened before messages can be sent or received

Example 2. Opening a Channel


ach_channel_t channel;
enum ach_status r = ach_open( &channel, "my_channel", NULL );
if( ACH_OK != r ) {
    syslog( LOG_ERR, "Could not open channel: %s\n", ach_result_to_string(r) );
    exit(EXIT_FAILURE);
}
      

2.4. Sending Data

enum ach_status ach_put(ach_channel_t *channel, const void *buf, size_t cnt);

The ach_put function writes a new message to the channel. This will go in the next open space in the circular array. If there is insufficient unused space in the circular array, then the oldest entry or entries will be overwritten. If the entire circular array is too small for the new message, the message is not written and ACH_OVERFLOW is returned.

Example 3. Opening a Channel


struct my_msg_type my_msg;
/* Fill in my_msg with useful data */
enum ach_status r = ach_put( &channel, &my_msg, sizeof(my_msg) );
if( ACH_OK != r ) {
    syslog( LOG_ERR, "Could not put data: %s\n", ach_result_to_string(r) );
}
      

2.5. Receiving Data

enum ach_status ach_get(ach_channel_t *channel, void *buf, size_t buf_size, size_t *frame_size, const struct timespec * restrict abstime, int options);

The ach_get is used to received data from a channel. The options parameter controls the behavior of this function and is the bitwise or of the following values.

ACH_O_NONBLOCK

Return immediately with ACH_STALE_FRAMES if no new data is present in the channel. Exclusive with ACH_O_WAIT.

ACH_O_WAIT

Wait until a new message is posted. Default behavior without this flag is to return immediately. Exclusive with ACH_O_NONBLOCK.

ACH_O_FIRST

Return the oldest message in the channel. Exclusive with ACH_O_LAST.

ACH_O_LAST

Return the newest message. Exclusive with ACH_O_FIRST.

ACH_O_ABSTIME

Timeout is an absolute time. Exclusive with ACH_O_RELTIME.

ACH_O_RELTIME

Timeout is a relative time. Exclusive with ACH_O_ABSTIME.

ACH_O_COPY

Copy the message to the buffer, even if we have already seen it. Only valid when ACH_O_WAIT is not given. Default behavior without this flag is to return ACH_STALE_FRAMES when there are no new messages.

Example 4. Poll for oldest message

This will get the next unseen message from the channel.


size_t frame_size;
enum ach_status r;
r  = ach_get( &channel, &my_msg, my_msg_size, &frame_size,
              NULL, ACH_O_NONBLOCK | ACH_O_FIRST );
if( ACH_MISSED_FRAME == r ) {
    printf("Missed a/some messages(s)\n");
} else if( ACH_STALE_FRAMES == r ) {
    printf("No new data\n");
} else if( ACH_OK != r ) {
    syslog( LOG_ERR, "Unable to get a message: %s\n", ach_result_to_string(r) );
} else if( frame_size != sizeof(my_msg) ) {
    syslog( LOG_WARNING, "Unexpected message size %" PRIuPTR ", expecting %" PRIuPTR "\n",
            frame_size, sizeof(my_msg) );
}
      

Example 5. Wait for newest message

This will get the newest message from the channel. If no unseen messages are in the channel, it will wait forever until one is posted.


enum ach_status r = ach_get( &channel, &my_msg, my_msg_size, &frame_size, NULL, ACH_O_WAIT | ACH_O_LAST );
      

Example 6. Timed wait for newest message

This will get the newest message from the channel. If no unseen messages are in the channel, it will up to one second until one is posted.


struct timespec t;
clockid_t clock;
enum ach_status r;
/* Get the channel clock */
r = ach_channel_clock(&channel,&clock);
if( ACH_OK != r ) {
  fprintf(stderr, "Could not get channel clock: %s\n",
          ach_result_to_string(r));
  exit(EXIT_FAILURE);
}
/* Get current time and set the timeout. */
clock_gettime( clock, &t );
t.tv_sec += 1;
/* Get the message */
enum ach_status r = ach_get( &channel, &my_msg, my_msg_size, &frame_size, &t,
                             ACH_O_WAIT | ACH_O_LAST | ACH_O_ABSTIME );
if( ACH_TIMEOUT == r ) {
  fprintf(stdout, "call to ach_get timed out\n");
}
      

NoteDoes anybody know what time it is?
 

While ach channels default to using CLOCK_MONOTONIC for timed waits, the issue of determining the correct time is fraught with complications. Traditionally, unix time -- seconds since January 1, 1970 -- has been defined based on UTC. This is given to nanosecond precision with CLOCK_REALTIME (nothing to do with real-time-as-in-low-latency-or-motion-control). However, UTC and thus typically CLOCK_REALTIME is discontinuous. It may be reset by the operator, by the NTP daemon, and it is defined as discontinous when leap seconds occur. This is bad when one wants to use the clock for periodic events as is likely for motion control. CLOCK_MONOTONIC is not discontinous, but even it has issues. The NTP daemon may slew this clock, speeding it up or slowing it down on Linux by 0.5ms per second. Linux does provide the CLOCK_MONOTONIC_RAW which is not slewed, but is also not portable.


2.6. Cancel message wait

enum ach_status ach_cancel(ach_channel_t *channel, const ach_cancel_attr_t *attr);

To interrupt an in-progress call to ach_get that has blocked waiting for a new message, use the ach_cancel function. This will cause ach_get to return ACH_CANCELED.

Example 7. Cancel waits upon a signal

This will install a signal handler to cancel waits when a signal is received.


ach_channel_t channel;

...

static void sighandler_cancel ( int sig ) {
    enum ach_status r = ach_cancel(&channel, NULL);
    if( ACH_OK != r ) exit(EXIT_FAILURE);
}

...

void setup_sighandler_cancel ( int sig ) {
    struct sigaction act;
    memset(&act, 0, sizeof(act));
    act.sa_handler = &sighandler_cancel;
    if( sigaction(sig, &act, NULL) ) exit(EXIT_FAILURE);
}
      

Example 8. Cancel waits in another thread

This will notify waits in another thread to cancel.


ach_channel_t channel;

...

ach_cancel_attr_t attr;
ach_cancel_attr_init(&attr);
attr.async_unsafe = 1; /* permit functions that are unsafe in signal handlers */
enum ach_status r = ach_cancel(&channel, &attr);
if( ACH_OK != r ) exit(EXIT_FAILURE);
      

2.7. Closing a channel

enum ach_status ach_close(ach_channel_t *channel);


2.8. Deleting a channel

enum ach_status ach_unlink(const char *name);


2.9. Channel Permissions

enum ach_status ach_chmod(ach_channel_t *channel, mode_t mode);

Set the POSIX permission bits of the channel to mode. Note that any channel access requires both the read (4) and write (2) bits to be set, because we must write in order to hold the mutex. The executable bit (1) is irrelevant.

Example 9. Make channel user and group accessible

It is best to specify mode as an octal number. A value of 0660 will give the owning user and group access, and deny access to all others.


      enum ach_status r = ach_get( &channel, 0660 );
    

2.10. Ignore old messages

enum ach_status ach_flush(ach_channel_t *channel);

Updates channel counters to ignore all previously published messages.


3. Language Bindings

3.1. Common Lisp

Ach includes bindings for Common Lisp using CFFI. They can be loaded via ASDF.


3.2. Python

Ach includes bindings for Python using a C extension module and wrapper class.

The module uses Python's Buffer Protocol to obtain a serialized representation for Python objects. This will work with string, bytearray, ctypes.Structure and other types which implement the Buffer Protocol.

Example 10. Python Put


#!/usr/bin/env python
import ach
c = ach.Channel('foo')
c.put('bar')
c.close()
    

Example 11. Python Get


#!/usr/bin/env python
import ach
c = ach.Channel('foo')
c.flush()
b = bytearray(10)
[status, framesize] = c.get( b, wait=True, last=False )
if status == ach.ACH_OK or status == ach.ACH_MISSED_FRAME:
    print b
else:
    raise ach.AchException( c.result_string(status) )
c.close()
    

4. ach Shell Tool

The ach command allows creation and deletion of channels from the shell.

ach {mk | rm | chmod | dump | file} [octal_mode] {chanel_name} [-o octal_mode] [-m frame_count] [-n frame_size] [-t] [-v] [-V] [-?]

Example 12. Create a channel

Create channel named "my_channel" with slots for 10 messages which have a nominal size of 64 bytes.

ach mk my_channel -m 10 -n 64

Example 13. Delete a channel

Delete channel named "my_channel".

ach rm my_channel

Example 14. Create a world-accessible channel

Create channel named "my_channel" which is world-accessible (permisions 666).

ach mk my_channel -m 10 -n 64 -o 666

Example 15. Create a channel unless it already exists

Create channel named "my_channel" unless it already exists, in which case do nothing.

ach mk my_channel -m 10 -n 64 -1

Example 16. Set channel permissions

Make channel accessible only by user and group.

ach chmod 660 my_channel


5. Linux Kernel Module

Ach includes a Linux kernel module which implements channels in kernel-space. This allows library clients to wait for messages on multiple channels (and in combination with any other file descriptors) using the poll() or select() family of functions.


5.1. Kernel Module Installation

The most convenient way to build the kernel module will probably be with DKMS (Dynamic Kernel Module System). DKMS enables building and rebuilding of out-of-tree kernel modules (such as ach, device drivers, etc.) when the kernel is upgraded. On Debian and Ubuntu, the package manager automatically invokes DKMS to rebuild the registered modules. To setup ach for DKMS, call configure with ./configure --enable-dkms-build, and the module sources for will be installed and built when you call make install.

Example 17. Build kernel module with dkms


./configure --enable-dkms-build
make
sudo make install

Alternatively, you can build the module in the ach source tree directly by calling ./configure --enable-kbuild --disable-dkms. If your kernel headers are not in the typical location and/or do not match the currently running kernel, you may to specify the kernel release and header directory.

Example 18. Manually specifying kernel information for kbuild


./configure --enable-kbuild \
            --disable-dkms \
             KERNELRELEASE=3.13.0-44-lowlatency \
             KDIR=/usr/src/linux-headers-3.13.0-44-lowlatency
make
sudo make install

After the kernel module is build and installed, you need to link it into the kernel. This is done with the modprobe command. The kernel module can be removed with rmmod. The module takes one optional parameter, max_devices which specifies the maximum number of kernel channels that can be created.

Example 19. Load kernel module


sudo modprobe ach
    

Example 20. Load kernel module and specify max_devices


sudo modprobe ach max_devices=1024
    

Example 21. Unload kernel module


sudo rmmod ach
    

5.2. Creating Kernel Channels

Kernel channels can be created using the ach command by passing the -k flag.

Example 22. Create kernel channel from shell


ach mk -k my-channel
    

Example 23. Unlink kernel channel from shell


ach rm my-channel
    

5.3. Kernel Channel Permissions

By default, kernel channels can only be created and accessed by root. However, you can use udev to control kernel channel permissions. The following example will set the group ownership of all ach devices to the ach group. The file needs to be placed in /etc/udev/rules.d/.

Example 24. udev rules file to set channel group


# /etc/udev/rules.d/10-ach.rules
# Control permissions for ach character devices

# Set the permissions of the ach control device
KERNEL=="achctrl", GROUP=="ach", MODE=="660"

# Set the permissions of ach channel devices
KERNEL=="ach-*", GROUP=="ach", MODE=="660"

# Create symbolic links for ach channel devices
KERNEL=="ach-*", SYMLINK+="ach/%k"
    

5.4. Waiting on Multiple Kernel Channels

Kernel channels support waiting for multiple events using the poll, select and similar functions. These functions require the file descriptor for the channel.

Example 25. Obtaining the channel file descriptor


ach_channel_t channel;
int channel_fd;
enum ach_status r;

/* Open channel */
/* ... */

/* Get Channel File Descriptor */
r = ach_channel_fd( &channel, &channel_fd );
if( ACH_OK != r ) {
    fprintf(stderr, "could not get file descriptor for channel '%s': %s\n",
            names[i], ach_result_to_string(r));
    exit(EXIT_FAILURE);
}
    

The channel file descriptor can then be passed to poll, select, etc. to determine when there is new data to read. Note that channels are always ready to write.

Example 26. Complete poll() example


#include <stdlib.h>
#include <pthread.h>
#include <inttypes.h>
#include <ach.h>

#include <stdio.h>
#include <poll.h>
#include <unistd.h>

int main(int argc, char **argv)
{
    const char *names[] = {"channel-0", "channel-1"};
    const size_t n = sizeof(names) / sizeof(names[0]);
    ach_channel_t channel[n];
    struct pollfd pfd[n];

    for( size_t i = 0; i < n; i ++ ) {
        /* Open Channel */
        enum ach_status r = ach_open( &channel[i], names[i], NULL );
        if( ACH_OK != r ) {
            fprintf(stderr, "could not open channel '%s': %s\n",
                    names[i], ach_result_to_string(r));
            exit(EXIT_FAILURE);
        }
        /* Get Channel File Descriptor */
        r = ach_channel_fd( &channel[i], &pfd[i].fd );
        if( ACH_OK != r ) {
            fprintf(stderr, "could not get file descriptor for channel '%s': %s\n",
                    names[i], ach_result_to_string(r));
            exit(EXIT_FAILURE);
        }
        /* Set events to poll for */
        pfd[i].events = POLLIN;
    }

    /* read forever */
    for(;;) {
        /* poll for new messages */
        int r_poll = poll( pfd, n, -1 );
        if( r_poll < 0 ) {
            perror("poll");
            exit(EXIT_FAILURE);
        }
        /* find channels with new data */
        for( size_t i = 0; i < n && r_poll > 0; i++ ) {
            if( (pfd[i].revents & POLLIN) ) {
                /* There's new data on this channel */
                char buf[512];
                size_t frame_size;
                enum ach_status r = ach_get( &channel[i], buf, sizeof(buf), &frame_size,
                                             NULL, ACH_O_NONBLOCK | ACH_O_FIRST );
                switch(r) {
                case ACH_OK:
                case ACH_MISSED_FRAME: {
                    /* this example just writes it to stdout */
                    ssize_t wr = write( STDOUT_FILENO, buf, frame_size );
                    if( wr < 0 ) {
                        perror("write");
                        exit(EXIT_FAILURE);
                    }
                    break;
                }
                default:
                    fprintf( stderr, "Error getting data from '%s': %s\n",
                             names[i], ach_result_to_string(r));
                    exit(EXIT_FAILURE);
                }
                r_poll--;
            }
        }
    }
    return 0;
}
    

Tip

The poll function has an advantage over select in that the arguments to poll can be reused over multiple calls while the arguments to select must be recreated for each call. However poll provides only millisecond timing resolution. Instead, the more recent ppoll supports nanosecond timing resolution. For details on these functions, consult the man pages with man -s 2 poll and man -s 2 ppoll.


6. achd Network Daemon

While the primary design goal of Ach is low-latency, single host IPC, the achd daemon additionally provides a way to relay messages over the network.


6.1. Server

achd serve

The achd server should be setup to run from the inetd super-server. Some distributions use the alternative xinetd, which is configured differently.

Note

Inetd and xinetd work by listening for incoming connections on all the ports specified in their configuration. When (x)inetd recieves a new connection, it forks off the process as given in the configuration file, (in this case achd), and hooks the standard input and output of that process to the socket connection.

Tip

On Debian-based systems (including Ubuntu and Mint), you can use the openbsd-inetd package, installed with apt-get install openbsd-inetd. Alternatively, you can install xinetd with with apt-get install xinetd.


6.1.1. Traditional Inetd Configuration

You should add the following line to /etc/inetd.conf, assuming you have installed ach under /usr/local/:


#/etc/inetd.conf
8076  stream  tcp  nowait  nobody  /usr/local/bin/achd /usr/local/bin/achd serve
    

Tip

On Debian-based systems where you have installed ach via the package manger, you can enable achd in inetd by running dpkg-reconfigure ach-utils.

You will probably need to restart inetd after editing the configuration file. How to do this depends on the init system used by your operating system and the particular inetd you are using. Here are some options which may work:

SysV Init (general GNU/Linux)

/etc/init.d/inetd restart

Debian (Ubuntu/Mint) with openbsd-inetd

service openbsd-inetd restart


6.1.2. Xinetd Configuration

If xinetd is run with the with the inetd_compat flag, then you can use a similar configuration to the traditional inetd.


#/etc/inetd.conf
achd  stream  tcp  nowait  nobody  /usr/local/bin/achd /usr/local/bin/achd serve

Otherwise, you can add the following stanza to your xinetd configuration. Typically, this would go in the per-service file /etc/xinetd.d/achd.


#/etc/xinetd.d/achd
service achd
{
	flags           = REUSE
	socket_type     = stream
	protocol        = tcp
	port            = 8076
	wait            = no
	user            = nobody
	server          = /usr/local/bin/achd
	server_args     = serve
	disable         = no
}
    

In addition, you will need to list the service in /etc/services.


#/etc/services
achd           8076/tcp
    

Then, tell xinetd to reload its configuration

SysV Init (general GNU/Linux)

/etc/init.d/xinetd restart

Redhat/Debian/Ubuntu with xinetd

service xinetd reload


6.1.3. Testing Server Configuration

Now, you can test this setup by telnetting to port 8076 on the server: telnet SERVER 8076. If you type in some gibberish (e.g. asdf) followed by a newline, then you should see something like this:


Escape character is '^]'.
asdf
status: 14 # ACH_BAD_HEADER
message: malformed header

.
Connection closed by foreign host.
    

Which will mean that achd is properly setup.

Tip

If achd is not operating properly, check the log files on your machine for more information. The log output will typically be stored in a file such as /var/log/daemon.log, though this depends on how syslog is configured on your system. You can also add the -vv flags to both server achd (in inetd.conf) the client achd commands to enable debugging output. Syslog may store the debug output in /var/log/debug.log.

Tip

For the achd server to access any channel, it must be readable and writable by the user under which achd runs ("nobody" in the above example line for inetd.conf). If you trust all users on the local machine, you can set the channel permissions to "666", but please note that this is not a secure configuration. An alternative is to create a separate user account for achd and ensure that channels are readable and writable by that account or one of its groups.

Warning

Currently, achd performs no authentication, access control, or encryption (this may be added in the future). If security is a concern in your application, it must currently be addressed at the network level, e.g. by physical isolation, firewalling, a VPN such as IPSec, and/or SSH tunnelling.


6.2. Client

From the client machine, you can run achd to push or pull message to or from the server.

achd {push | pull} {hostname} {chanel_name} [-t tcp|udp] [-p port] [-z remote_channel_name] [-u microseconds] [-d] [-r] [-q] [-v] [-V] [-?]

Example 27. Pull channel from server

achd pull server_name channel_name

Example 28. Push channel to server and retry dropped connections

achd -r pull server_name channel_name

Example 29. Pull channel from server via UDP

achd -t udp pull server_name channel_name

Example 30. Push channel to server, running the the background

achd -d push server_name channel_name

Example 31. Pull channel from server at 10Hz

achd pull server_name channel_name -u 100000

To add both encryption and compression, you can also forward achd over SSH. This will tunnel the achd TCP connection through the encrypted and compressed SSH connection.

Example 32. Pull channel over compressed SSH connection

ssh -f -C -L1234:localhost:8076 server_name sleep 120

achd -p 1234 pull localhost channel_name


6.3. Locating Channels via mDNS

Multicast DNS (mDNS) can be used to browse the Ach channels on the local network and to lookup the hostname for an channel. mDNS names are generally in the .local domain To advertise an Ach channel, it is registered as a service with the local mDNS server.

Example 33. Create a channel and register using the Avahi mDNS server

ach mk -a channel_name

Example 34. Browse local network channels using Avahi

avahi-browse -t _ach._tcp

Example 35. Lookup channel origin in achd client

achd pull channel_name

This performs an mDNS query to determine the origin of the specified channel, then connects to that host.

Note

mDNS is a peer-to-peer variation on DNS that reduces the need for prior configuration by automatically discovering services on the local network. Each peer runs an mDNS server, and services, e.g., file shares, printers, Ach channels, are located by sending a multicast query. Popular implementations of mDNS are Bonjour and Avahi.

Some communication systems, which were generally developed prior to mDNS, use specialized naming services. For example, ONC RPC uses the port mapper and CORBA has its own naming service.


7. achcop Watchdog Utility

The achcop program can be used to start daemons, and restart them if they die.

achcop [-P cop-pid-file] [-p child-pid-file] [-e stderr-file] [-o stdout-file] [-d] [-r] [-v] [-V] [-?] -- {child-program} [child-arguments...]


7.1. Features

  • Detach process from terminal and run in background (daemonize), -d flag.

  • Lock PID files and write PIDs, -P and -p flags.

  • Reconnect if connection is broken, -r flag.

  • Redirect standard output and standard error to files, -o and -e flags.

  • Restart process if SIGHUP is received


7.2. Signals

Note

Signals are how POSIX user programs are notified of asynchrous events. They are similar in spirit to low-level interrupts and interrupt service routines. When a process receives a signal, its execution is interrupted, and it runs the specified handler for that signal. For example, when you hit Control-C at a terminal, the process in the forground receives the interrupt signal, SIGINT. The default handler for this signal will terminate the process. For more details, see man 7 signal.

Achcop will respond to the following signals. You can send a signal with the kill shell command or the kill C function.

SIGTERM, SIGINT

Achcop sends SIGTERM to the child, waits for child to exit, and return child's exit status

SIGHUP

Achcop sends SIGTERM to the child, waits for child to exit, and restarts the child

SIGUSR1

Indicates that the child has started successfully. If the child does not send SIGUSR1 and continues running past an initial timeout, achcop assumes it started successfully.

SIGUSR2

Indicates that the child has failed to start properly. If the child fails on its first attempt and before a timeout has elapsed, it will not be restarted. If your child does not send this signal, achcop may wait a few seconds to ensure that the child runs successfully.


7.3. Examples

7.3.1. Shell Commands

The following shell lines show how one might run achcop.

Example 36. Run achd in the background (-d), restarting (-r) if it fails

achcop -d -r -P /var/run/achcop-foo.pid -p /var/run/achd-foo.pid -- achd -s -r push server channel_foo

Example 37. Command achcop to restart achd

kill -HUP `cat /var/run/achcop-foo.pid`

Example 38. Command achcop to terminate achd

kill `cat /var/run/achcop-foo.pid`


7.3.2. Signaling Achcop from Child

The following C code will notify a parent achcop of your child's success or failure. This is an optional step. Achcop will operate properly without these signals; however, providing the signals means achcop does not need to wait for a brief timeout to ensure that your child continues operating.

Example 39. Signal achcop of child success or failure


/* only send signal if running under achcop */
if( my_parent_is_achcop ) {
    int sig;
    /* determine signal to send */
    if( initialized_correctly ) {
        sig = SIGUSR1; /* success */
    } else {
        sig = SIGUSR2; /* failure */
    }
    /* send the signal */
    int r = kill( getppid(), sig );
    /* check that we sent the signal */
    if( r ) {
        perror("Couldn't signal parent");
        exit(EXIT_FAILURE);
    }
}
    

Note that the default signal handlers for SIGUSR1 and SIGUSER2 terminate the process; therefore, if you send these signals to your shell, your shell will terminate.


8. achlog Logging Utility

The achlog program can be used to log messages from an ach channel to disk.

achlog [-z] [-V] [-?] {channels...}


8.1. Log Format

The Log format is similar to the TCP network protocol for achpipe. The log files begins with a sequence of ASCII key-value headers, followed by a single "." character, followed by a sequence of messages.

Example 40. achlog headers


ACHLOG
channel-name: foo
log-version: 0
log-time-ach: 6226133.801454417
log-time-real: 1371238788.956337009 # Fri Jun 14 15:39:48 2013
local-host: daneel
user: ntd # Neil T. Dantam
.

Messages are framed in the file with the following C struct:


struct {
    uint8_t reserved[8];   /* reserved for future use */
    uint8_t size_bytes[8]; /* size of data, stored little endian */
    uint8_t data[1];       /* flexible array containing size_bytes of data */
};
    

8.2. Usage

Example 41. Log channel foo

achlog foo

Example 42. Log channel foo and bar, compress with gzip

achlog -z foo bar

Example 43. Log channel foo in the background

achcop -d -P /var/run/achcop-achlog-foo.pid -p /var/run/achlog-foo.pid -- achlog foo

Example 44. Stop background logging

kill `cat /var/run/achcop-achlog-foo.pid`


9. Performance Tuning

The performance of your real-time application will be highly dependent on your underlying hardware and operating system. It is quite likely than any off-the-shelf hardware or operating system you use will be tuned for something other than low-latency real-time operation, e.g. maximum thoughput or minimum power consumption. However, there are several paremeters to adjust to improve your performance.


9.1. Disable CPU frequency scaling

Many current CPUs will lower their frequency when idle in order to reduce consumption. This is great for extending your laptop's battery life but poor for minimizing latency. On an x86/amd64 PC, you can likely disable frequency scaling from the BIOS, which will reduce your system's latency. In the author's experience, simply changing to a "performance" CPU governer from within the operating system did NOT provide the same latency reduction; you need to disable freqency scaling from the BIOS.


9.2. Use a Real-Time Operating System

General purpose operating systems (e.g. GNU/Linux, FreeBSD) are designed to maximize throughput and perhaps to reduce latency to "human-tolerable" (0.1s) levels. The worst-case performance may be sacrificed in order to improve average-case performance. For a real-time application, it is generally worst-case performance that matters rather than average case, and latency is more important than throughput.


9.2.1. Linux PREEMPT_RT

If using a Linux kernel, you can apply the PREEMPT_RT patch to allow a fully-preemptible kernel. Without this, a low priority task in the middle of a system call may block a high priority task from running until the system call completes. With a fully-preemptible kernel, the high priority task gets to run immediately.


9.2.2. Linux / Xenomai

Xenomai is a dual-kernel real-time operating system. A hard real-time kernel, Adeos, handles interrupts and a full Linux kernel runs along-side, permitting normal Linux processes to run directly. Real-time processes must use a specific Xenomai API skin. One of these skins is POSIX, and this POSIX interface is sufficient to use Ach for real-time Xenomai processes.

Warning

There is one important caveat when using Ach on Xenomai. Because of the dual-kernel approach, Xenomai scheduling primitives -- mutexes and condition variables -- are different from the Linux primitives; a Linux process and a Xenomai process cannot synchronize on the same condition variable. Therefore, Ach compiled for Xenomai cannot directly share channels with Ach compiled for Linux. If you want a Linux process and a Xenomai process on the same machine to communicate via Ach, you can use achd to forward the messages between the two sides. This issue does not affect messages forwarded between different machines.


9.2.3. Other Real-Time Operating Systems

While Ach development has focused on Linux, it should be portable to other operating systems which provide the POSIX real-time extensions. In particular, Ach depends on POSIX shared memory and process-shared mutexes and condition variables. Contributions to increase portability are extremely welcome.


9.3. Benchmark your system

You can use the included achbench command to test performance on your system while you try different configurations. This program will fork a specified number of publishers and subscribers and measure the latency of message passing.

Example 45. Benchmarking Latency

Measure latency with one publisher and two subscribers over ten seconds

achbench -p 1 -r 2 -s 10 > output_file


10. Theory of Operation

POSIX provides a rich variety of IPC mechanisms, but none of them are ideal for real-time control. The fundamental difference is that as soon as a new sample of the signal (in the oscilloscope sense, not a POSIX signal) is produced, nearly everything no longer cares about older samples. Thus, we want to always favor new data over old data whereas general-purpose POSIX IPC favors the old data. This problem is typically referred to as Head of Line (HOL) Blocking. The exception to this is POSIX shared memory. However, synchronization of shared memory is a difficult programming problem, making the typical and direct use of POSIX shared memory unfavorable for developing robust systems. Furthermore, some parts of the system, such as logging, may need to access older samples, so this also should be permitted at least on a best-effort basis.


10.1. Data Structure

The core data structure of an Ach channel is a pair of circular arrays located in the POSIX shared memory file. There is a data array and an index array. The data array contains variable sized entries which store the actual message frames sent through the Ach channel. The index array contains fixed size entries where each entry has both an offset into the data array and the length of that data entry. A head offset into each array indicates both the place to insert the next data and the location of the most recent message frame. This pair of circular arrays allows us to find the variable sized message frames by first looking at a known offset in the fixed-sized index array.

Access to the channel is synchronized using a mutex and condition variable. This allows readers to either periodically poll the channel for new data or to wait on the condition variable until a writer has posted a new message. Using a read/write lock instead would have allowed only polling. Additionally, synchronization using a mutex prevents starvation and enables proper priority inheritance between processes, important to maintaining real-time performance.


10.2. Core Procedures

Two procedures compose the core of ach: ach_put and ach_get.


10.2.1. ach_put

The procedure ach_put inserts new messages into the channel. Its function is analogous to write, sendmsg, and mq_send. The procedure is given a pointer to the shared memory region for the channel and a byte array containing the message to post. There are four broad steps to the procedure:

  1. Get an index entry. If there is at least one free index entry, use it. Otherwise, clear the oldest index entry and its corresponding message in the data array.

  2. Make room in the data array. If there is enough room already, continue. Otherwise, repeatedly free the oldest message until there is enough room.

  3. Copy the message into data array.

  4. Update the offset and free counts in the channel structure.


10.2.2. ach_get

The procedure ach_get receives a message from the channel. Its function is analogous to read, recvmsg, and mq_receive. The procedure takes a pointer to the shared memory region, a storage buffer to copy the message to, the last message sequence number received, the next index offset to check for a message, and option flags indicating whether to block waiting for a new message and whether to return the newest message bypassing any older unseen messages.

  1. If we are to wait for a new message and there is no new message, then wait. Otherwise, if there are no new messages, return a status code indicating this fact.

  2. Find the index entry to use. If we are to return the newest message, use that entry. Otherwise, if the next entry we expected to use contains the next sequence number we expect to see, use that entry. Otherwise, use the oldest entry.

  3. According to the offset and size from the selected index entry, copy the message from the data array into the provided storage buffer.

  4. Update the sequence number count and next index entry offset for this receiver.


10.3. Serialization

Ach is orthogonal to the issue of serialization: it only handles raw byte arrays. Thus, you can choose whatever type of serialization is most appropriate for you application. For maximum performance, it may be appropriate to define message types as simple C structures. If more flexibility is required, there are a variety of other options to choose from: XDR, ASN.1, Protocol Buffers, etc.


Colophon

This manual is written in Docbook XML.

Redistribution and use in source (XML DocBook) and 'compiled' forms (SGML, HTML, PDF, PostScript, RTF and so forth) with or without modification, are permitted provided that the following conditions are met:

  1. Redistributions of source code (XML DocBook) must retain the above copyright notice, this list of conditions and the following disclaimer as the first lines of this file unmodified.

  2. Redistributions in compiled form (transformed to other DTDs, converted to PDF, PostScript, RTF and other formats) must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.

THIS DOCUMENTATION IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS DOCUMENTATION, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.