On the Confluent website, you can find this title:
Stream data changes everything
From the createors of Kafka, a real-time messaging system, this is not a surprising assertion. Yet, data streaming infrastructures have gained in popularity and many projects require the data to be processed as soon as it shows up. This contributed to the development of famous technologies like Spark Stremaing, Apache Storm and more broadly websockets.
This latest piece of software in particular brought real-time data feeds to web applications, trying to solve low-latency connections. Coupled with the asynchronous Node.js, you can build a powerful event-based reactive system. But what about Python? Given the popularity of the language in data science, would it be possible to bring the benefits of this kind of data ingestion? As this two-part post series will show, it turns out that modern Python (Python 3.4 or later) supports asynchronous data streaming apps.
Python 3.4 introduced in the standard library the module asyncio to provision the language with:
Asynchronous I/O, event loop, coroutines and tasks
While Python treats functions as first-class objects (meaning you can assign them to variables and pass them as arguments), most developers follow an imperative programming style. It seems on purpose:
It requires super human discipline to write readable code in callbacks and if you don’t believe me look at any piece of JavaScript code. - Guido van Rossum
So Asyncio is the pythonic answer to asynchronous programming. This paradigm makes a lot of sense for otherwise costly I/O operations or when we need events to trigger code.
For fun and profit, let's build such a project. We will simulate a dummy electrical circuit composed of three components:
This set us up with an interesting machine-to-machine communication problem to solve.
Note that the code snippets in this post make use of features like async and await introduced in Python 3.5. While it would be possible to backport to Python 3.4, I highly recommend that you follow along with the same version or newer. Anaconda or Pyenv can ease the installation process if necessary.
$ python --version
Python 3.5.1
$ pip --version
pip 8.1.2
Our first step, the clock, will introduce both asyncio and websocket basics. We need a straightforward method that fires tick signals through a websocket and wait for acknowledgement.
# filename: sketch.py
async def clock(socket, port, tacks=3, delay=1)
The async keyword is sugar syntaxing introduced in Python 3.5 to replace the previous @asyncio.coroutine. The official pep 492 explains it all but the tldr : API quality.
To simplify websocket connection plumbing, we can take advantage of the eponymous package: pip install websockets==3.5.1. It hides the protocol's complexity behind an elegant context manager.
# filename: sketch.py
# the path "datafeed" in this uri will be a parameter available in the other side but we won't use it for this example
uri = 'ws://{socket}:{port}/datafeed'.format(socket=socket, port=port)
# manage asynchronously the connection
async with websockets.connect(uri) as ws:
for payload in range(tacks):
print('[ clock ] > {}'.format(payload))
# send payload and wait for acknowledgement
await ws.send(str(payload))
print('[ clock ] < {}'.format(await ws.recv()))
time.sleep(delay)
The keyword await was introduced with async and replaces the old yield from to read values from asynchronous functions. Inside the context manager the connection stays open and we can stream data to the server we contacted.
At the core of our application are entities capable of speaking to each other directly. To make things fun, we will expose the same API as Arduino sketches, or a setup method that runs once at startup and a loop called when new data is available.
# -*- coding: utf-8 -*-
# vim_fenc=utf-8
#
# filename: factory.py
import abc
import asyncio
import websockets
class FactoryLoop(object):
""" Glue components to manage the evented-loop model. """
__metaclass__ = abc.ABCMeta
def__init__(self, *args, **kwargs):
# call user-defined initialization
self.setup(*args, **kwargs)
def out(self, text):
print('[ {} ] {}'.format(type(self).__name__, text))
@abc.abstractmethod
def setup(self, *args, **kwargs):
pass
@abc.abstractmethod
async def loop(self, channel, data):
pass
def run(self, host, port):
try:
server = websockets.serve(self.loop, host, port)
self.out('serving on {}:{}'.format(host, port))
asyncio.get_event_loop().run_until_complete(server)
asyncio.get_event_loop().run_forever()
exceptOSError:
self.out('Cannot bind to this port! Is the server already running?')
exceptKeyboardInterrupt:
self.out('Keyboard interruption, aborting.')
asyncio.get_event_loop().stop()
finally:
asyncio.get_event_loop().close()
The child objects will be required to implement setup and loop, while this class will take care of:
The websockets states the server callback is expected to have the signature on_connection(websocket, path). This is too low-level for our purpose. Instead, we can write a decorator to manage asyncio details, message passing, or error handling. We will only call self.loop with application-level-relevant information: the actual message and the websocket path.
# filename: factory.py
import functools
import websockets
def reactive(fn):
@functools.wraps(fn)
async def on_connection(klass, websocket, path):
"""Dispatch events and wrap execution."""
klass.out('** new client connected, path={}'.format(path))
# process messages as long as the connection is opened or
# an error is raised
whileTrue:
try:
message = await websocket.recv()
aknowledgement = await fn(klass, path, message)
await websocket.send(aknowledgement or 'n/a')
except websockets.exceptions.ConnectionClosed as e:
klass.out('done processing messages: {}n'.format(e))
break
return on_connection
Now we can develop a readable IOPin object.
# filename: sketch.py
import factory
class IOPin(factory.FactoryLoop):
"""Set an IO pin to 0 or 1 randomly."""
def setup(self, chance=0.5, sequence=3):
self.chance = chance
self.sequence = chance
def state(self):
"""Toggle state, sometimes."""
return0if random.random() < self.chance else1
@factory.reactive
async def loop(self, channel, msg):
"""Callback on new data."""
self.out('new tick triggered on {}: {}'.format(channel, msg))
bits_stream = [self.state() for _ in range(self.sequence)]
self.out('toggling pin state: {}'.format(bits_stream))
# ...
# ... toggle pin state here
# ...
return'acknowledged'
We finally need some glue to run both the clock and IOPin and test if the latter toggles its state when the former fires new ticks. The following snippet uses a convenient library, click 6.6, to parse command-line arguments.
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim_fenc=utf-8
#
# filename: arduino.py
import sys
import asyncio
import click
import sketchs
@click.command()
@click.argument('sketch')
@click.option('-s', '--socket', default='localhost', help='Websocket to bind to')
@click.option('-p', '--port', default=8765, help='Websocket port to bind to')
@click.option('-t', '--tacks', default=5, help='Number of clock ticks')
@click.option('-d', '--delay', default=1, help='Clock intervals')
def main(sketch, **flags):
if sketch == 'clock':
# delegate the asynchronous execution to the event loop
asyncio.get_event_loop().run_until_complete(sketchs.clock(**flags))
elif sketch == 'iopin':
# arguments in the constructor go as is to our `setup` method
sketchs.IOPin(chance=0.6).run(flags['socket'], flags['port'])
else:
print('unknown sketch, please choose clock, iopin or buzzer')
return1
return0
if__name__ == '__main__':
sys.exit(main())
Don't forget to chmod +x the script and start the server in a first terminal ./arduino.py iopin. When it is listening for connections, start the clock with ./arduino.py clock and watch them communicate! Note that we used here common default host and port so they can find each other.
We have a good start with our app, and now in Part 2 we will further explore peer-to-peer communication, service discovery, and the streaming machine-to-machine concept.
Xavier Bruhiere is a lead developer at AppTurbo in Paris, where he develops innovative prototypes to support company growth. He is addicted to learning, hacking on intriguing hot techs (both soft and hard), and practicing high intensity sports.