Introducing Ahab — Docker Event Handling

Jason Dusek
tech-at-instacart
Published in
6 min readNov 19, 2015

--

The Docker event stream offers a powerful way to customize Docker networking. Today, Instacart is open sourcing Ahab, a Docker event handling library.

Docker at Instacart

At Instacart, we use Docker for test and staging environments in a configuration that has been live since late September of 2015. A long-standing challenge with Docker is how to allow developers to SSH into containers. By default, Docker provides an IP address to every container; but that IP is on a network internal to the Docker host. Docker does provide some alternative modes of networking, and while so-called host networking comes close to meeting our needs, it would allow but one container per host to make use of port 22 for SSH (and then what would the host use?).

As a general rule we have held to convention over configuration at all layers of our stack. In terms of system deployment, this means we assign meaningful hostnames to all nodes and that we run services on their standard ports. This alleviates two important burdens for us — maintaining a mapping of services to non-standard ports and maintaining a mapping of IPs to names — and it allows us to leverage tools in their stock configurations instead of writing and maintaining wrappers.

To deliver on this approach for Docker, we need to assign an IP to each Docker container as it comes up. We can easily attach spare IPs to the Docker host’s NIC; and using NAT, we can send traffic on a spare IP to a Docker container’s Docker-assigned IP. (Here we are sticking with Docker’s conventional mode of IP assignment.) But how are we to run the NAT rule at the right time? And when the Docker container disappears, how do we ensure the rule is removed?

Working with Events

Fundamentally, systems of this kind invite two approaches: state synchronization and event handling. Synchronizing state is often the easiest way for complex cases, because it easy to verify that the “result” (in our case, a set of IPTables rules) corresponds to a given state (in our case, a list of Docker containers). However, state synchronization often runs afoul of latency due to polling, unless you are using events to synchronize an internal model of the state which brings us back to event handling.

By approaching the problem as one of handling individual events, we can expect low latency — each state change is handled as soon at is registered — and often quite short and intuitive code. The trade-off, however, is that a missed event — for example, when the event handler is restarted — can result in nonsensical behavior. We’d like the event handler to start from a “base state” that makes sense and this can be hard to arrange. In our case, where this can be a problem is if the event handler stops, a container starts and then the event handler starts. The handler never sees the start event; and when it sees a stop even for the container, it will attempt to remove a NAT rule that isn’t there. A similar problem exists for containers that stop while the event handler is down (perhaps to be updated): the NAT rule will never be removed, causing a conflict when we later wish to use that particular spare IP for another container. In practice these problems can be resolved through idempotency; and because the event handler is very stable and is upgraded fairly infrequently, at Instacart we have accepted the possibility that we may have to reboot the Docker hosts from time to time to clear state. (Interestingly, it’s not something we’ve had to do.)

A Simple Event Handler

The default listener class in Ahab accepts a list of functions as consumers. Here is an example script that, when run on a Docker host with Docker on the default port, prints each Docker event as it arrives:

from datetime import datetimefrom ahab import Ahabdef f(event, data):
print datetime.utcnow()
print event
print data
listener = Ahab(handlers=[f])
listener.listen()
(Ahab provides a similar function, Ahab.default, for you. It prints with a Python logger instead of print.)The event is a semi-structured representation of the underlying Docker event; and the data provides information about the affected object (if available).event = { 'status': 'tag',
'timeNano': 1447720921780124063,
'id': 'internal/image:',
'time': datetime.datetime(2015, 11, 16, 16, 42, 1) }
Docker posts events for containers, images and tags. You can distinguish container events from the others by the presence of a hostname.def f(event, data):
print datetime.utcnow()
if 'Config' in data and 'Hostname' in data['Config']:
print 'Event %s looks like a container event.' % event['id']
To see what kinds of data your local Docker installation posts for different events, you can run the ahab command line tool with debug logging enabled:ahab --console debugDynamic NAT Rules: Bringing Together iptc & ahabTo make it easier to build on Ahab, we've put together a hook script that finds and assigns secondary IPs to Docker containers as they come online, and returns the IPs to the pool as the containers terminate.#!/usr/bin/env python
from contextlib import contextmanager
import logging
from pprint import pformat
from random import randint
import subprocess
from ahab import Ahab
import iptc
log = logging.getLogger()def main():
logging.basicConfig(level=logging.INFO)
listener = Ahab(handlers=[nat_handler])
listener.listen()
def nat_handler(event, data):
log.info('Event:\n%s', pformat(event))
if 'Config' in data and 'Hostname' in data['Config']:
ident = data['Id']
f = {
'start': create_nat, # On 'start', we create the NAT rules
'die': clear_nat # On 'die', we remove them
}.get(event['status'])
# The 'start' and 'die' events are the only ones relevant for
# managing our NAT rules.
if f is None:
return
host = data['Config']['Hostname']
ip = data['NetworkSettings']['IPAddress']
# We make a few attempts at the IP Tables operaiont, in case
# there is overlap with another event handler trying to do the
# same thing for another container.
for n in range(1, 5):
try:
f(host, ip)
break
except iptc.IPTCError as e:
if 'Resource temporarily unavailable' not in str(e):
log.error('IP Tables trouble for %s during NAT '
'setup, not continuing: %s', ident, e)
break
except Exception as e:
log.error('Unexpected error while handling NAT for %s: '
'%s', ident, e)
break
# No matter what happens, we don't error out, because that
# would crash other handlers that might be in the midst of
# configuring other containers.
def create_nat(host, container_ip):
with table(iptc.Table.NAT) as nat:
free_ips = list(secondary_ips() - ips_in_use())
free = free_ips[randint(1, len(free_ips)) - 1]
# Send packets that come in on the outer IP to the inner IP.
dnat = iptc.Rule()
dnat.dst = free
target = dnat.create_target('DNAT')
target.to_destination = container_ip
comment = dnat.create_match('comment')
comment.comment = 'ahab//' + host
iptc.Chain(nat, 'DOCKER').insert_rule(dnat)
# Rewrite packets from the inner IP so they go out on the outer IP.
snat = iptc.Rule()
snat.src = container_ip
target = snat.create_target('SNAT')
target.to_source = free
comment = snat.create_match('comment')
comment.comment = 'ahab//' + host
iptc.Chain(nat, 'POSTROUTING').insert_rule(snat)
def clear_nat(host, container_ip):
del container_ip # Could be used for sanity check
with table(iptc.Table.NAT) as nat:
token = 'ahab//' + host
chains = ['DOCKER', 'POSTROUTING']
for chain in [iptc.Chain(nat, name) for name in chains]:
for rule in chain.rules:
comments = [m for m in rule.matches if m.name == 'comment']
if any(c.comment == token for c in comments):
chain.delete_rule(rule)
def ips_in_use():
with table(iptc.Table.NAT) as nat:
ips = set()
token = 'ahab//'
chains = ['DOCKER', 'POSTROUTING']
for chain in [iptc.Chain(nat, name) for name in chains]:
for rule in chain.rules:
comments = [m for m in rule.matches if m.name == 'comment']
if any(c.comment.startswith(token) for c in comments):
if rule.dst is not None:
ips |= set([rule.dst.split('/')[0]])
log.info('IPs in use: %s', ips)
return ips
def secondary_ips():
secondary_ips = []
script = 'ip addr list dev eth0 | fgrep secondary'
text = subprocess.check_output(['sh', '-c', script])
for line in text.splitlines():
fields = line.split()
if len(fields) < 2:
continue
secondary_ips += [fields[1].split('/')[0]]
return set(secondary_ips)
open_tables = {}@contextmanager
def table(tab):
"""Access IPTables transactionally in a uniform way.
Ensures all access is done without autocommit and that only the outer
most task commits, and also ensures we refresh once and commit once.
"""
global open_tables
if tab in open_tables:
yield open_tables[tab]
else:
open_tables[tab] = iptc.Table(tab)
open_tables[tab].refresh()
open_tables[tab].autocommit = False
yield open_tables[tab]
open_tables[tab].commit()
del open_tables[tab]
if __name__ == '__main__':
main()
Here at Instacart, we love open source. Feel free to check our other open source projects here. If you love working on open source too, we should talk ;)!

--

--