Unit Testing aiohttp Clients - part 2

In the previous post we designed our testing infrastructure to work with aiohttp. We will now put that testing infrastructure through a baptism by fire.

  1. Testing tools.
  2. Actual implementation (this post).

If you remember, we were designing a simple class to fetch the current price of bitcoin from Kraken exchange. So far we only have the interface. Let's implement it and ensure its correctness.

We will use a TDD approach, that is: write tests against the interface first, then implement the code required to make tests pass.

Some Test-driven development

Writing the test cases

We have a single interface, with a single method. Not much to add. Here is a trimmed down version of the main tests, you can see the full test suite in test_kraken.py.

@pytest.mark.asyncio
async def test_get_price(aiohttp_redirector):
    ''' get_price returns price of latest transaction '''
    async with CaseControlledTestServer() as server:
        aiohttp_redirector.add_server('api.kraken.com', 80, server.port)
        kraken = Kraken(session=aiohttp_redirector.session)

        task = asyncio.ensure_future(kraken.get_price('btcusd'))
        request = await server.receive_request()
        assert request.path_qs == '/0/public/Ticker?pair=XXBTZUSD'

        server.send_response(request,
            text='{"error":[],"result":{"XXBTZUSD":{"a":["3652.20000",'
                 '"2","2.000"],"b":["3651.10000","1","1.000"],'
                 '"c":["3652.20000","1.00000000"],"o":"3727.50000"}}}',
            content_type='application/json'
        )
        result = await task
        assert result == Decimal('3652.20000')


@pytest.mark.asyncio
async def test_get_price_connection_failure(aiohttp_redirector, unused_tcp_port):
    ''' failure to connect raises a QuotationError exception '''
    aiohttp_redirector.add_server('api.kraken.com', 80, unused_tcp_port)

    kraken = Kraken(session=aiohttp_redirector.session)
    with pytest.raises(QuotationError):
        await kraken.get_price('btcusd')


@pytest.mark.parametrize('payload', [
    pytest.param('foobar', id='not json'),
    pytest.param('{"foo":"bar"}', id='not in kraken format'),
    pytest.param('{"error":[], "result": {}}', id='missing data'),
    pytest.param(
        '{"error":[], "result": {"XXBTZUSD":{}}}',
        id='missing price data'),
    pytest.param(
        '{"error":[], "result":{"XXBTZUSD":{"a":["3652.20000","2","2.000"],'
        '"b":["3651.10000","1","1.000"],"c":["123.456.789","1.00000000"],'
        '"o":"3727.50000"}}}',
        id='invalid price format'),
])
@pytest.mark.asyncio
async def test_get_price_invalid_payload(aiohttp_redirector, payload):
    ''' invalid data from server raises a PayloadError exception '''
    async with CaseControlledTestServer() as server:
        aiohttp_redirector.add_server('api.kraken.com', 80, server.port)
        kraken = Kraken(session=aiohttp_redirector.session)

        task = asyncio.ensure_future(kraken.get_price('btcusd'))
        request = await server.receive_request(timeout=TIMEOUT)

        server.send_response(request, text=payload,
                             content_type='application/json')
        with pytest.raises(PayloadError):
            await task

Starting pytest, we check that our tests work correctly. That is, they all run, and all return a failure. Great, let's move to implementation.

Implementation

Finally, we can now implement our Kraken class:

class Kraken:
    ROOT_URL = 'http://api.kraken.com/0/'
    TICKER_ENDPOINT = 'public/Ticker?pair={pair}'
    PAIRS = {'btcusd': 'XXBTZUSD', 'ethusd': 'XETHZUSD',
             'ltcusd': 'XLTCZUSD', 'xrpusd': 'XXRPZUSD'}

    def __init__(self, *, session):
        self._session = session

    async def get_price(self, pair):
        ''' Return latest trade price for given pair '''
        pair = self.PAIRS[pair]
        data = await self._fetch(self.TICKER_ENDPOINT.format(pair=pair))
        return decimal.Decimal(data[pair]['c'][0])

    async def _fetch(self, endpoint):
        ''' Fetch data from endpoint and parse result '''
        async with self._session.get(self.ROOT_URL + endpoint) as response:
            data = await response.json()
        error = data.get('error')
        if error:
            raise QuotationError(error[0])
        return data['result']

Some error handling was left aside for clarity. You can check the full implementation in kraken.py. Also I worked around the lack of SSL support by changing the URL from https to http for now.

Checking progress

Before adding support for SSL, let's see how we fare:

(env) spectras$ pytest -v
============================= test session starts ==============================
collecting ... collected 8 items

tests/test_kraken.py::test_get_price_control PASSED                      [ 11%]
tests/test_kraken.py::test_get_price_invalid_pair PASSED                 [ 22%]
tests/test_kraken.py::test_get_price_connection_failure PASSED           [ 33%]
tests/test_kraken.py::test_get_price_invalid_certificate SKIPPED         [ 44%]
tests/test_kraken.py::test_get_price_invalid_payload[not json] PASSED    [ 55%]
tests/test_kraken.py::test_get_price_invalid_payload[not in kraken format] PASSED [ 66%]
tests/test_kraken.py::test_get_price_invalid_payload[missing data] PASSED [ 77%]
tests/test_kraken.py::test_get_price_invalid_payload[missing price data] PASSED [ 88%]
tests/test_kraken.py::test_get_price_invalid_payload[invalid price] PASSED [100%]

====================== 8 passed, 1 skipped in 0.12 seconds =====================

Great! Let's fix SSL now.

Self-signed certificate

All that is left from our initial objectives is SSL support. It is not as hard as it sounds. We have to create a self-signed certificate for use during the tests. Perfect use for a fixture.

As it is not the main concern of this article, I will spare you the code. You can find it on the github repository, in certificate.py. Once loaded, that fixture creates a self-signed server certificate for the whole test session, valid only for local system and expiring after 24 hours.

That fixture is available to tests as ssl_certificate.

Client side

We must ensure our test clients accepts our certificate. This is done by loading it into the TCPConnector used by ClientSession.

To that end, we go back to the aiohttp_redirector we built earlier, and modify it so it injects the ssl_certificate fixture into the session:

@pytest.fixture
async def aiohttp_redirector(ssl_certificate):    resolver = FakeResolver()
    connector = aiohttp.TCPConnector(
        resolver=resolver,
        ssl=ssl_certificate.client_context(),        use_dns_cache=False,
    )
    async with aiohttp.ClientSession(connector=connector) as session:
        yield _RedirectContext(add_server=resolver.add, session=session)

This is it, SSL certificate verfication is now enabled, and trusts only one certificate: the temporary self-signed certificate.

Server side

The RawTestServer we extend from aiohttp accepts an SSL context, but not in a convenient way: it requires dropping the async with that provides lifecycle management, and replacing it with a manual try…except block. It's cumbersome and easy to get wrong.

Let's pass it around ourselves instead. We update the constructor to accept the ssl context:

    def __init__(self, *, ssl=None, **kwargs):        super().__init__(self._handle_request, **kwargs)
        self._ssl = ssl        self._requests = asyncio.Queue()

Then we override start_server to use stored ssl context automatically:

    async def start_server(self, **kwargs):
        kwargs.setdefault('ssl', self._ssl)        await super().start_server(**kwargs)

Updating test cases

We are done adding SSL support to our testing infrastructure. Let's use it for our Kraken tests! Firstly, put https back everywhere, and update our tests accordingly.

That's 3 lines to update in every test:

@pytest.mark.asyncio
async def test_get_price(aiohttp_redirector, ssl_certificate):    ''' get_price returns price of latest transaction '''
    async with CaseTestServer(ssl=ssl_certificate.server_context()) as server:        aiohttp_redirector.add_server('api.kraken.com', 443, server.port)        #
        # --- rest of test case is unchanged ---
        #

We start pytest again and check passing test cases still pass. But now, they pass in SSL!

SSL Certificate test case

We can now write our final test case: verify that our client does not talk to the server if it has an invalid certificate:

@pytest.mark.asyncio
async def test_get_price_invalid_certificate(aiohttp_redirector):

    with TemporaryCertificate() as wrong_cert:        async with CaseTestServer(ssl=wrong_cert.server_context()) as server:
            http_redirect.add_server('api.kraken.com', 443, server.port)

            kraken = Kraken(session=http_redirect.session)
            with pytest.raises(QuotationError) as exc_info:
                await kraken.get_price('btcusd')

    assert isinstance(exc_info.value.__cause__, aiohttp.ClientConnectorSSLError)

Notice how we start our server with a new, different temporary certificate, instead of using the trusted one from the fixture. We then ensure the client raises a QuotationError with the invalid certificate as a cause.

tests/test_kraken.py::test_get_price_invalid_certificate PASSED          [ 44%]

Conclusion

Our journey ends here for now. The final code is available on github. Duly tested, fully asynchronous simple price fetcher for Kraken exchange.

We did not explore all the possibilities of the test server we built, but manually controlling requests and responses from the test case allows controlling the timing and ordering of parallel requests, a crucial point for testing more complex clients.

Speaking of timing, the most observant should have noticed we did not handle one specific class of errors: timeouts. They are correct! But this is not specific to aiohttp clients, and will deserve an article of its own.

Did you enjoy this post? Did you design a better way to test asynchronous http clients? Let me know!.