Unit Testing aiohttp Clients

Asynchronous code is the new paradigm in python those last years. Leveraging coroutine support to write asynchronous code in a non-blocking fashion is very convenient.

But it opens up a whole new uncharted territory. Testing, notably, is significantly harder: exceptions might go unnoticed, unit tests might end without waiting for all asynchronous code to complete, and so on.

It does, however, enable interesting new techniques for testing. In this article, we shall test asynchronous client-side code written using the amazing aiohttp. We will do this in two parts:

  1. Testing tools (this post).
  2. Actual implementation.

Objectives

Fetching bitcoin price

So here are our requirements: we would like to fetch the latest bitcoin price from Kraken exchange, using the following interface:

class QuotationProvider:
    async def get_price(self, pair):
        ''' Get latest trade price for given pair.
        Args:
            pair (str): name of a pair symbol on exchange.
        Returns:
            Decimal: latest price for the symbol.
        Raises:
            PayloadError: if server response could not be understood.
            QuotationError: if quotation is not available.
            ValueError: if `pair` is not a valid pair symbol.
        '''

class QuotationError(Exception): pass
class PayloadError(QuotationError): pass
Interface definition

Here is the expected use, assuming we built a Kraken class that implements the above interface:

>>> xchg = Kraken(session=aiohttp.ClientSession())
>>> await xchg.get_price('btcusd')
Decimal('3967.40000')
Example session using apython interpreter.

Simple enough, right? Now, before we dive into code, how do we ensure it works fine? Or:
How do we test it properly?

Testing goals

Our final goal for this post is to unit-test this quotation client thoroughly and cleanly. What does that mean? It means we want our tests to be:

Isolated

Testing must not depend on external resources. In particular, tests must not perform actual requests against Kraken's servers.

Faithful

Test cases should be as close as possible to real-world conditions. We want to strive for behavioral testing, rather than checking implementation details.

This means we want to setup an actual HTTP server for the tests to interact with tested code, under the control of the test case. The good news is with asyncio, we can actually run one from within the test.

Comprehensive

In particular, we want to test error conditions and security features. Namely, we want to test that we will not accept data from a server that has an invalid SSL certificate. This implies our tests cannot circumvent the SSL layer.

Unintrusive

We want to test the actual code, not modify or patch it for testing, as this could hide bugs. In particular, if the client code uses URIs, we are not allowed to reach in and alter them.

Lastly, we will aim at making tests as convenient to write as possible. Most developers hate writing tests, and lowering the barrier is a reasonable way to encourage a more comprehensive test suite.

Test support tools

Checking out included batteries

The aiohttp library provides a basic testing toolset. It allows setting up a test webapp, creating a server instance to serve it, and optionally running it asynchronously in the background. For instance:

@pytest.mark.asyncio
def test_example():
    app = aiohttp.web.Application()
    async def handler(request):
        return aiohttp.web.Response(text=SAMPLE_RESPONSE_FROM_KRAKEN)
    app.router.add_route('get', '/0/public/Ticker', handler)

    server = aiohttp.test_utils.TestServer(app)
    async with server:
        kraken = Kraken(server='localhost', port=server.port)
        price = await kraken.get_price('btcusd')
        assert price == Decimal('3602.20000')

This starts a test server on localhost that sends back a JSON hello message when queried for /hello. This is all nice and well, but it falls short in several ways:

  • Controlling the server throughout the test case is not easy. This was a simple example, but how about an endpoint with a more complex behavior (say, a REST API)?
  • It is cumbersome. This must be copy-pasted in each and every test case. It cannot be made a fixture since the handler is test-case specific.
  • We cannot control timing easily. Say we need to check the behavior of client code while it is waiting for the response.
  • It makes test case hard to read, as code is out of order: we define how to respond to queries before they are made.

Let's build a better infrastructure for our tests.

Handling requests in test cases

The idea is simple: the test case is an asynchronous task. The test server is an asynchronous task as well. We should be able to use inter-task communication tools to have the server delegate request processing back to the test server.

Client request/response sequence under testcase control
Client request/response sequence under testcase control.

Let's implement this workflow. We extend aiohttp's RawTestServer and provide our own request handler. This is the main idea: instead of handling the request, the handler pushes it to a queue and waits for the test case to provide the response.

class CaseControlledTestServer(aiohttp.test_utils.RawTestServer):
    def __init__(self, **kwargs):
        super().__init__(self._handle_request, **kwargs)
        self._requests = asyncio.Queue()
        self._responses = {}                # {id(request): Future}

    async def close(self):
        ''' cancel all pending requests before closing '''
        for future in self._responses.values():
            future.cancel()
        await super().close()

    async def _handle_request(self, request):
        ''' push request to test case and wait until it provides a response '''
        self._responses[id(request)] = response = asyncio.Future()
        self._requests.put_nowait(request)
        try:
            # wait until test case provides a response
            return await response
        finally:
            del self._responses[id(request)]

    async def receive_request(self):
        ''' wait until test server receives a request '''
        return await self._requests.get()

    def send_response(self, request, *args, **kwargs):
        ''' send web response from test case to client code '''
        response = aiohttp.web.Response(*args, **kwargs)
        self._responses[id(request)].set_result(response)

Two things deserve an explanation:

  • We represent the yet-to-be response using a Future. The request handler then awaits the Future, which puts it to sleep until Future.set_result() is called. When this happens, the request handler awakens, and await response evaluates to the value that was passed to Future.set_result(). That is, the Response, which it can return to aiohttp for sending on the network.
  • For proper cleanup, we override close() and cancel all outstanding futures. This will awaken waiting handlers immediately, causing them to raise a CancelledError exception. This ensures we don't leave blocked tasks behind.

Using our CaseControlledTestServer, we can rewrite the previous example like this:

@pytest.mark.asyncio
def test_example():
    async with CaseControlledTestServer() as server:
        kraken = Kraken(server='localhost', port=server.port)

        task = asyncio.create_task(kraken.get_price('btcusd'))
        request = await server.receive_request()
        assert request.path_qs == '/0/public/Ticker?pair=XXBTZUSD'

        server.send_response(request, text=SAMPLE_RESPONSE_FROM_KRAKEN)
        price = await task
        assert price == Decimal('3602.20000')

Much better already. Notice how linear the test case has become? The test case walks through the steps of the asynchronous workflow, and gets access to the request object, which allows checking it, eg: veryfing the query string, or ensuring some header is present.

Let's now fix the uri redirection we have ignored so far.

Rerouting network accesses

Up to this point, we have been manually providing server address and port to our client code. This scheme is inherently wrong though, as it breaks encapsulation: as it is, every single part of our project that needs to build a Kraken instance must know the official server address and port.

Let's fix this.

Global state in aiohttp is cleanly wrapped in a ClientSession object. That objects holds connection pools, settings, a cookie database, etc. Then, all classes that use aiohttp accept a ClientSession object at construction (as a dependency injection). This makes it easy to share resources among components or isolate them depending on desired behavior.

The interesting part is ClientSession does not know how to access the network. This is all abstracted out in a connector, which is injected into the session at creation: “here, anytime you need network access, use this connector”.

We are thus interested in altering the TCP connector, an instance of class aiohttp.TCPConnector. Digging deeper, we can see that in turn, that connector delegates address resolution to a resolver object.

This is our target: we will provide a custom resolver that resolves tested addresses to local test servers and purposefully fail all other addresses, ensuring no connection can be made to other servers.

Let's write the resolver first:

class FakeResolver:
    def __init__(self):
        self._servers = {}

    def add(self, host, port, target_port):
        self._servers[host, port] = target_port

    async def resolve(self, host, port=0, family=socket.AF_INET):
        try:
            fake_port = self._servers[host, port]
        except KeyError:
            raise OSError('No test server known for %s' % host)
        return [{
            'hostname': host,
            'host': '127.0.0.1',
            'port': fake_port,
            'family': socket.AF_INET,
            'proto': 0,
            'flags': socket.AI_NUMERICHOST,
        }]

Straightforward: we record added test servers, and resolve raises an OSError if requested address does not match a known test server.

Now we want to inject our resolver into a connector and make it conveniently available within a test-scoped ClientSession instance. A perfect use case for building a fixture:

_RedirectContext = namedtuple('RedirectContext', 'add_server session')

@pytest.fixture
async def aiohttp_redirector():
    resolver = FakeResolver()
    connector = aiohttp.TCPConnector(resolver=resolver, use_dns_cache=False)
    async with aiohttp.ClientSession(connector=connector) as session:
        yield _RedirectContext(add_server=resolver.add, session=session)

We can now use our fixture in our testcase example:

@pytest.mark.asyncio
def test_example(aiohttp_redirector):
    async with CaseControlledTestServer() as server:
        aiohttp_redirector.add_server('api.kraken.com', 80, server.port)
        kraken = Kraken(session=aiohttp_redirector.session)
        #
        # --- rest is unchanged ---
        #

Our Kraken class holds the knowledge of what it should connect to, and our tests uses a custom resolver to direct those connections to the test server. We no longer break encapsulation.

Recap

In this first part, we have created our testing infrastructure.

  • Our tests are now isolated, we run them against a local, test-case controlled server.
  • Our tests are as faithful as possible. Perhaps integration tests would setup an actual web server along with our client code, but for unit tests it is as close to real conditions we can go.
  • Our tests are more comprehensive. Having actual TCP connections happening, we can test all error conditions that come with them. We still lack SSL layer support though.
  • Our tests are unintrusive. As long as the client code follows the best practices and uses dependency injection to get a ClientSession instance, we can test it with no change.
  • We left SSL aside for now.

In the next post, we will use our testing infrastructure to test and develop our Kraken feed.