Skip to content

Tutorial#

Installation#

PostgreSQL, MySQL, and SQLite are currently supported.

pip install "faststream[sqla-postgres]"
pip install "faststream[sqla-mysql]"
pip install "faststream[sqla-sqlite]"

Database Tables#

The SQLA broker requires two tables — message (active messages) and message_archive (completed/failed messages), with table names customizable via the broker's message_table_name and message_archive_table_name parameters. You can customize the tables to your liking (partition them, add indices, specify more specific data types like JSONB, etc.) as long as they generally conform to the following schemas. Schema check is done on startup if the brokers's validate_schema_on_start is True.

from datetime import datetime, timezone
from sqlalchemy import MetaData, Table
from sqlalchemy import (
    JSON,
    BigInteger,
    Column,
    DateTime,
    Enum,
    LargeBinary,
    SmallInteger,
    String,
    Table,
)

from faststream.sqla.message import SqlaMessageState

metadata = MetaData()

message = Table(
    "message",
    metadata,
    Column("id", BigInteger, primary_key=True),
    Column("queue", String(255), nullable=False, index=True),
    Column("headers", JSON, nullable=True),
    Column("payload", LargeBinary, nullable=False),
    Column(
        "state",
        Enum(SqlaMessageState),
        nullable=False,
        index=True,
        server_default=SqlaMessageState.PENDING.name,
    ),
    Column("attempts_count", SmallInteger, nullable=False, default=0),
    Column("deliveries_count", SmallInteger, nullable=False, default=0),
    Column(
        "created_at",
        DateTime,
        nullable=False,
        default=lambda: datetime.now(timezone.utc).replace(tzinfo=None),
    ),
    Column("first_attempt_at", DateTime),
    Column(
        "next_attempt_at",
        DateTime,
        nullable=False,
        default=lambda: datetime.now(timezone.utc).replace(tzinfo=None),
        index=True,
    ),
    Column("last_attempt_at", DateTime),
    Column("acquired_at", DateTime),
)


message_archive = Table(
    "message_archive",
    metadata,
    Column("id", BigInteger, primary_key=True),
    Column("queue", String(255), nullable=False, index=True),
    Column("headers", JSON, nullable=True),
    Column("payload", LargeBinary, nullable=False),
    Column("state", Enum(SqlaMessageState), nullable=False, index=True),
    Column("attempts_count", SmallInteger, nullable=False),
    Column("deliveries_count", SmallInteger, nullable=False),
    Column("created_at", DateTime, nullable=False),
    Column("first_attempt_at", DateTime),
    Column("last_attempt_at", DateTime),
    Column(
        "archived_at",
        DateTime,
        nullable=False,
        default=lambda: datetime.now(timezone.utc).replace(tzinfo=None),
    ),
)

Broker#

from sqlalchemy.ext.asyncio import create_async_engine

from faststream.sqla import SqlaBroker

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
broker = SqlaBroker(engine=engine)

Broker parameters#

  • engine — SQLAlchemy AsyncEngine to use for requests to the database.
  • message_table_name — Name of the table containing active messages. Defaults to message.
  • message_archive_table_name — Name of the table containing completed/failed messages. Defaults to message_archive.
  • validate_schema_on_start — If True (default), validates that the configured tables exist and conform to the expected schema.
  • graceful_timeout — Seconds to wait for in-flight messages to finish processing during shutdown.

Publishing#

from datetime import datetime, timedelta, timezone

from sqlalchemy.ext.asyncio import create_async_engine

from faststream import FastStream
from faststream.sqla import SqlaBroker

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
broker = SqlaBroker(engine=engine)
app = FastStream(broker)

publisher_sqla = broker.publisher()

@app.after_startup
async def publish_examples():
    await publisher_sqla.publish("Hello, SQLA!", queue="my_queue")

The broker's and publisher's (see publishing) .publish() methods accept:

  • message — The message body.
  • queue — The target queue name.
  • headers — Optional dict[str, str] of message headers.
  • next_attempt_at — Optional datetime (with timezone) for delayed delivery.
  • connection — Optional SQLAlchemy AsyncConnection for transactional publishing.

Delayed delivery#

The message won't be fetched until next_attempt_at if it is provided.

1
2
3
4
5
    await publisher_sqla.publish(
        "Process me later",
        queue="my_queue",
        next_attempt_at=datetime.now(timezone.utc) + timedelta(minutes=5),
    )

Transactional publishing#

When connection is provided, the message insert participates in the same database transaction as your other operations, enabling the transactional outbox pattern.

1
2
3
4
5
6
7
    async with engine.begin() as connection:
        # ... your other database operations using `connection` ...
        await publisher_sqla.publish(
            "Transactional message",
            queue="my_queue",
            connection=connection,
        )

Subscribing#

from sqlalchemy.ext.asyncio import create_async_engine

from faststream import FastStream
from faststream.middlewares.acknowledgement.config import AckPolicy
from faststream.sqla import SqlaBroker
from faststream.sqla.retry import ConstantRetryStrategy

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
broker = SqlaBroker(engine=engine)
app = FastStream(broker)


@broker.subscriber(
    queues=["my_queue"],
    max_workers=10,
    retry_strategy=ConstantRetryStrategy(
        delay_seconds=5,
        max_attempts=3,
        max_total_delay_seconds=None,
    ),
    min_fetch_interval=0.1,
    max_fetch_interval=1,
    fetch_batch_size=10,
    overfetch_factor=2,
    flush_interval=1,
    release_stuck_interval=60,
    release_stuck_timeout=60*5,
    max_deliveries=10,
    ack_policy=AckPolicy.NACK_ON_ERROR,
)
async def handler(msg: str):
    print(msg)

Subscriber parameters#

  • queues — List of queue names to consume from.
  • max_workers — Number of concurrent handler coroutines.
  • retry_strategy — Called to determine if and how soon a Nack'ed message is retried. If None, AckPolicy.NACK_ON_ERROR has the same effect as AckPolicy.REJECT_ON_ERROR.
  • fetch_batch_size — Maximum number of messages to fetch in a single batch. A fetch's actual limit might be lower if the free capacity of the acquired-but-not-yet-processed messages set is smaller.
  • overfetch_factor — Multiplier for fetch_batch_size to size the maximum size of the set of acquired-but-not-yet-processed messages.
  • min_fetch_interval — Minimum interval between consecutive fetches. If the last fetch was full (returned as many messages as the fetch's limit), the next fetch happens after both (i) minimum fetch interval has passed, and (ii) capacity equal to the fetch batch size has freed up in the set of acquired-but-not-yet-processed messages.
  • max_fetch_interval — Maximum interval between consecutive fetches.
  • flush_interval — Interval between flushes of processed message state to the database.
  • release_stuck_interval — Interval between checks for stuck PROCESSING messages.
  • release_stuck_timeout — Interval since acquired_at after which a PROCESSING message is considered stuck and is released back to PENDING.
  • max_deliveries — Maximum number of deliveries allowed for a message. If set, messages that have reached this limit are Reject'ed to FAILED without processing. Note that this might violate the at-least-once processing semantics.
  • ack_policyAckPolicy that controls acknowledgement behavior.

Delayed retries#

When a message is Nack'ed (either manually or by AckPolicy.NACK_ON_ERROR), the retry_strategy determines if and when the message should be retried. All strategies accept max_attempts and max_total_delay_seconds as common parameters — if either limit is reached, the message is marked as FAILED instead of RETRYABLE. Otherwise, the strategy schedules the message for a retry determined by the returned next_attempt_at.

ConstantRetryStrategy#

Retries after a fixed delay_seconds every time.

1
2
3
4
5
constant = ConstantRetryStrategy(
    delay_seconds=5,
    max_attempts=3,
    max_total_delay_seconds=None,
)

LinearRetryStrategy#

First retry after initial_delay_seconds, then the delay increases by step_seconds with each attempt.

1
2
3
4
5
6
linear = LinearRetryStrategy(
    initial_delay_seconds=1,
    step_seconds=2,
    max_attempts=5,
    max_total_delay_seconds=60,
)

ExponentialBackoffRetryStrategy#

Delay starts at initial_delay_seconds and is multiplied by multiplier on each attempt. max_delay_seconds caps the delay.

1
2
3
4
5
6
7
exponential = ExponentialBackoffRetryStrategy(
    initial_delay_seconds=1,
    multiplier=2.0,
    max_delay_seconds=60,
    max_attempts=8,
    max_total_delay_seconds=300,
)

ExponentialBackoffWithJitterRetryStrategy#

Same as exponential backoff, but adds random jitter (up to delay * jitter_factor) to spread out retries and avoid thundering herds.

1
2
3
4
5
6
7
8
exponential_jitter = ExponentialBackoffWithJitterRetryStrategy(
    initial_delay_seconds=1,
    multiplier=2.0,
    max_delay_seconds=60,
    jitter_factor=0.5,
    max_attempts=8,
    max_total_delay_seconds=300,
)

ConstantWithJitterRetryStrategy#

Retries after base_delay_seconds plus random jitter in the range [-jitter_seconds, +jitter_seconds].

1
2
3
4
5
6
constant_jitter = ConstantWithJitterRetryStrategy(
    base_delay_seconds=5,
    jitter_seconds=2,
    max_attempts=3,
    max_total_delay_seconds=None,
)

NoRetryStrategy#

No retries — the message is marked as FAILED on the first Nack.

no_retry = NoRetryStrategy()

Transactional outbox#

Implementing the transactional outbox pattern becomes as simple as the following.

Publish messages transactionally with your other database operations.

from sqlalchemy.ext.asyncio import create_async_engine
from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.sqla import SqlaBroker, SqlaMessage
from faststream.middlewares.acknowledgement.config import AckPolicy
from faststream.sqla.retry import ExponentialBackoffRetryStrategy

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
broker_sqla = SqlaBroker(engine=engine)
broker_kafka = KafkaBroker("127.0.0.1:9092")
app = FastStream(broker_sqla, on_startup=[broker_kafka.connect])
publisher_sqla = broker_sqla.publisher()


@app.after_startup # just an example
async def publish_examples():
    async with engine.begin() as connection:
        # ... your other database operations using `connection` ...
        await publisher_sqla.publish(
            {"message": "Hello, SQLA!"},
            queue="sqla_queue",
            connection=connection,
        )

And relay the messages from the database to another broker.

publisher_kafka = broker_kafka.publisher("kafka_topic")


@publisher_kafka
@broker_sqla.subscriber(
    queues=["sqla_queue"],
    max_workers=10,
    retry_strategy=ExponentialBackoffRetryStrategy(
        initial_delay_seconds=1,
        multiplier=2,
        max_delay_seconds=60 * 5,
        max_total_delay_seconds=60 * 60 * 6,
        max_attempts=None,
    ),
    max_fetch_interval=1,
    min_fetch_interval=0,
    fetch_batch_size=10,
    overfetch_factor=1.5,
    flush_interval=3,
    release_stuck_interval=5,
    release_stuck_timeout=60 * 60,
    max_deliveries=20,
    ack_policy=AckPolicy.NACK_ON_ERROR,
)
async def handle_msg(msg_body: dict) -> dict:
    return msg_body