Use with SQLAlchemy#

Awaitlet can be combined with code that uses SQLAlchemy in one or both of two ways; one is by using SQLAlchemy’s asyncio API, and the other is using SQLAlchemy’s sync API with an asyncio driver.

For the examples that follow, we’ll make use of the following SQLAlchemy ORM model:

from __future__ import annotations

import datetime
from typing import Optional

from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import select
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column


class Base(DeclarativeBase):
    pass


class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    data: Mapped[Optional[str]]
    create_date: Mapped[datetime.datetime] = mapped_column(
        server_default=func.now()
    )

Following the ORM model, we will also have a global async engine / session setup, using the asyncpg PostgreSQL driver:

from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine

async_engine = create_async_engine(
    "postgresql+asyncpg://scott:tiger@localhost/test",
    echo=True,
)
async_session = async_sessionmaker(async_engine, expire_on_commit=False)

Using SQLAlchemy’s asyncio from non-async functions in asyncio programs#

If we have a program that makes use of SQLAlchemy asyncio, we can call upon async defs which use this database logic from functions that are not themselves declared as async. Suppose our program had two async functions that use SQLAlchemy’s async API directly:

async def setup_tables():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)


async def work_with_data():
    async with async_session() as session:
        async with session.begin():
            session.add_all(
                [
                    A(data="a1"),
                    A(data="a2"),
                    A(data="a3"),
                ]
            )

        stmt = select(A.data)

        result = await session.scalars(stmt)
        return result.all()

(fun fact, the SQLAlchemy conn.run_sync() calls in the above example are essentially equivalent to using awaitlet’s async_def() call)

If we had intermediary code that was not written to use asyncio, but wanted to be able to call these functions directly when the overall program is run in an asyncio context, we could achieve that as follows:

import asyncio
import awaitlet

def call_async_db_code():
    awaitlet.awaitlet(setup_tables())
    result = awaitlet.awaitlet(work_with_data())
    return result

async def front_facing_asyncio_facade():
    result = await awaitlet.async_def(call_async_db_code)
    print(f"Result: {result}")

asyncio.run(front_facing_asyncio_facade())

Above, front_facing_asyncio_facade() represents code we’ve written to present an asyncio front to our application. call_async_db_code() represents some part of the code that is written in traditional blocking style but has some areas that want to call into async code. The async_def() function enters the blocking function into an implicit async context, allowing the awaitlet() function to call out to real awaitables.

Using SQLAlchemy’s sync API in Asyncio Programs#

In this pattern, we present the more compelling case of a large codebase that’s written to use SQLAlchemy’s traditional blocking style API. Given the above methods written in blocking style:

sessionmaker = sessionmaker(...)

def setup_tables():
    with engine.begin() as conn:
        Base.metadata.drop_all(conn)
        Base.metadata.create_all(conn)


def work_with_data():
    with sessionmaker() as session:
        with session.begin():
            session.add_all(
                [
                    A(data="a1"),
                    A(data="a2"),
                    A(data="a3"),
                ]
            )

        stmt = select(A.data)

        result = session.scalars(stmt)
        return result.all()

We’ll note above the code is the same code as used previously, except we see there is a traditional blocking style sessionmaker() in use. The goal is to run the above code against an asyncio database driver, in this case asyncpg. How can we achieve this?

Firstly, we continue to use the create_async_engine() call to create our engine. In SQLAlchemy, there is a blocking style Engine object embedded in the AsyncEngine, however it links to an asyncio driver and also modifies some connection pool behaviors to be async compatible. The accessibility of this Engine is part of SQLAlchemy’s public async API. So here, when we know our program is using an asyncio driver, we create the engine as we did previously, then link the Engine to our sessionmaker():

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker

async_engine = create_async_engine(
    "postgresql+asyncpg://scott:tiger@localhost/test",
    echo=True,
)

sessionmaker = sessionmaker(async_engine.engine)

We can then call upon our setup_tables() and work_with_data() functions only using async_def(); SQLAlchemy itself will make use of its internal form of awaitlet() which is compatible with ours:

import asyncio
import awaitlet

def call_async_db_code():
    setup_tables()
    result = work_with_data()
    return result

async def front_facing_asyncio_facade():
    result = await awaitlet.async_def(call_async_db_code)
    print(f"Result: {result}")

asyncio.run(front_facing_asyncio_facade())

Putting the program segments together we create a fully runnable example below:

from __future__ import annotations

import asyncio
import datetime
from typing import Optional

import awaitlet
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import select
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column


class Base(DeclarativeBase):
    pass


class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    data: Mapped[Optional[str]]
    create_date: Mapped[datetime.datetime] = mapped_column(
        server_default=func.now()
    )


async_engine = create_async_engine(
    "postgresql+asyncpg://scott:tiger@localhost/test",
    echo=True,
)

engine = async_engine.engine
sessionmaker = sessionmaker(async_engine.engine)

def setup_tables():
    with engine.begin() as conn:
        Base.metadata.drop_all(conn)
        Base.metadata.create_all(conn)


def work_with_data():
    with sessionmaker() as session:
        with session.begin():
            session.add_all(
                [
                    A(data="a1"),
                    A(data="a2"),
                    A(data="a3"),
                ]
            )

        stmt = select(A.data)

        result = session.scalars(stmt)
        return result.all()

def call_async_db_code():
    setup_tables()
    result = work_with_data()
    return result

async def front_facing_asyncio_facade():
    result = await awaitlet.async_def(call_async_db_code)
    print(f"Result: {result}")

asyncio.run(front_facing_asyncio_facade())