Fixing timeout errors on code running on AWS

The other day, i was running my code in an AWS EKS cluster which was behind a NAT gateway and the code which was running locally perfectly started exhibited weird behavior.

To give an overview of what the code was doing, it was making an API Post request which took more than 5-6 mins in some cases to return a response. So, as a sane thing to do, we added a request timeout of 10 mins. This is how the sample code looked.

from httpx import AsyncClient
async with AsyncClient(base_url="https://the-called-service", timeout=600) as client:
	_ = await client.post("/a_long_open_connection")

Assuming the connection returns in 4-5 mins, we should be seeing successes. But on the cluster, the requests were getting timed out after 600 seconds.

Now this was weird, as AWS has a pretty reliable network and considering that this was running quite nicely on my shitty home internet, something was wrong.

After a bit of searching i found this on AWS page.

Internet Connection Drops after 350 seconds

Problem
Your instances can access the internet, but the connection drops after 350 seconds.

Cause
If a connection that's using a NAT gateway is idle for 350 seconds or more, the connection times out.
When a connection times out, a NAT gateway returns an RST packet to any resources behind the NAT gateway that attempt to continue the connection (it does not send a FIN packet).

Solution
To prevent the connection from being dropped, you can initiate more traffic over the connection. Alternatively, you can enable TCP keepalive on the instance with a value less than 350 seconds.

By default linux has a keep alive timeout of 2 hours and this is well less than that. As to why AWS did it, i am not sure, and considering this, the code is running on EKS cluster which i have no control over. We needed a way to fix this.

So, how do we keep alive this connection. Well, lets just send periodic calls to some other endpoint which returns instantaneously.

So, we write a periodic async wrapper like so.  

from __future__ import annotations

import asyncio
import logging
from asyncio import ensure_future
from functools import wraps
from traceback import format_exception
from typing import Any, Callable, Coroutine, Union

from starlette.concurrency import run_in_threadpool

NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[[Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]], NoArgsNoReturnAsyncFuncT]


def repeat_every(
        *,
        seconds: float,
        wait_first: bool = False,
        logger: logging.Logger | None = None,
        raise_exceptions: bool = False,
        max_repetitions: int | None = None,
) -> NoArgsNoReturnDecorator:
    """
    This function returns a decorator that modifies a function, so it is periodically re-executed after its first call.

    The function it decorates should accept no arguments and return nothing. If necessary, this can be accomplished
    by using `functools.partial` or otherwise wrapping the target function prior to decoration.

    Parameters
    ----------
    seconds: float
        The number of seconds to wait between repeated calls
    wait_first: bool (default False)
        If True, the function will wait for a single period before the first call
    logger: Optional[logging.Logger] (default None)
        The logger to use to log any exceptions raised by calls to the decorated function.
        If not provided, exceptions will not be logged by this function (though they may be handled by the event loop).
    raise_exceptions: bool (default False)
        If True, errors raised by the decorated function will be raised to the event loop's exception handler.
        Note that if an error is raised, the repeated execution will stop.
        Otherwise, exceptions are just logged and the execution continues to repeat.
        See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.set_exception_handler for more info.
    max_repetitions: Optional[int] (default None)
        The maximum number of times to call the repeated function. If `None`, the function is repeated forever.
    """

    def decorator(func: NoArgsNoReturnAsyncFuncT | NoArgsNoReturnFuncT) -> NoArgsNoReturnAsyncFuncT:
        """
        Converts the decorated function into a repeated, periodically-called version of itself.
        """
        is_coroutine = asyncio.iscoroutinefunction(func)

        @wraps(func)
        async def wrapped() -> None:
            repetitions = 0

            async def loop() -> None:
                nonlocal repetitions
                if wait_first:
                    await asyncio.sleep(seconds)
                while max_repetitions is None or repetitions < max_repetitions:
                    try:
                        if is_coroutine:
                            await func()  # type: ignore
                        else:
                            await run_in_threadpool(func)
                        repetitions += 1
                    except Exception as exc:
                        if logger is not None:
                            formatted_exception = "".join(format_exception(type(exc), exc, exc.__traceback__))
                            logger.error(formatted_exception)
                        if raise_exceptions:
                            raise exc
                    await asyncio.sleep(seconds)

            ensure_future(loop())

        return wrapped

    return decorator

Thanks to the awesome package called fastapi-utils where this code is borrowed from. You can check the project out here.

And then we modify our initial code like so.

from httpx import AsyncClient
import asyncio

async with AsyncClient(base_url="https://the-called-service", timeout=600) as client:
	@repeat_every(seconds=60)
	async def continuous_poll():
    		_ = await client.get("/healthcheck")	
	
_ = asyncio.create_task(continuous_poll())
_ = await client.post("/a_long_open_connection")
    

And viola it now works.

Thats for all. Thanks for reading.