Tutorial#
Installation#
PostgreSQL, MySQL, and SQLite are currently supported.
Database Tables#
The SQLA broker requires two tables — message (active messages) and message_archive (completed/failed messages), with table names customizable via the broker's message_table_name and message_archive_table_name parameters. You can customize the tables to your liking (partition them, add indices, specify more specific data types like JSONB, etc.) as long as they generally conform to the following schemas. Schema check is done on startup if the brokers's validate_schema_on_start is True.
Broker#
from sqlalchemy.ext.asyncio import create_async_engine
from faststream.sqla import SqlaBroker
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
broker = SqlaBroker(engine=engine)
Broker parameters#
engine— SQLAlchemyAsyncEngineto use for requests to the database.message_table_name— Name of the table containing active messages. Defaults tomessage.message_archive_table_name— Name of the table containing completed/failed messages. Defaults tomessage_archive.validate_schema_on_start— IfTrue(default), validates that the configured tables exist and conform to the expected schema.graceful_timeout— Seconds to wait for in-flight messages to finish processing during shutdown.
Publishing#
The broker's and publisher's (see publishing) .publish() methods accept:
message— The message body.queue— The target queue name.headers— Optionaldict[str, str]of message headers.next_attempt_at— Optionaldatetime(with timezone) for delayed delivery.connection— Optional SQLAlchemyAsyncConnectionfor transactional publishing.
Delayed delivery#
The message won't be fetched until next_attempt_at if it is provided.
Transactional publishing#
When connection is provided, the message insert participates in the same database transaction as your other operations, enabling the transactional outbox pattern.
Subscribing#
Subscriber parameters#
queues— List of queue names to consume from.max_workers— Number of concurrent handler coroutines.retry_strategy— Called to determine if and how soon a Nack'ed message is retried. IfNone,AckPolicy.NACK_ON_ERRORhas the same effect asAckPolicy.REJECT_ON_ERROR.fetch_batch_size— Maximum number of messages to fetch in a single batch. A fetch's actual limit might be lower if the free capacity of the acquired-but-not-yet-processed messages set is smaller.overfetch_factor— Multiplier forfetch_batch_sizeto size the maximum size of the set of acquired-but-not-yet-processed messages.min_fetch_interval— Minimum interval between consecutive fetches. If the last fetch was full (returned as many messages as the fetch's limit), the next fetch happens after both (i) minimum fetch interval has passed, and (ii) capacity equal to the fetch batch size has freed up in the set of acquired-but-not-yet-processed messages.max_fetch_interval— Maximum interval between consecutive fetches.flush_interval— Interval between flushes of processed message state to the database.release_stuck_interval— Interval between checks for stuckPROCESSINGmessages.release_stuck_timeout— Interval sinceacquired_atafter which aPROCESSINGmessage is considered stuck and is released back toPENDING.max_deliveries— Maximum number of deliveries allowed for a message. If set, messages that have reached this limit are Reject'ed toFAILEDwithout processing. Note that this might violate the at-least-once processing semantics.ack_policy—AckPolicythat controls acknowledgement behavior.
Delayed retries#
When a message is Nack'ed (either manually or by AckPolicy.NACK_ON_ERROR), the retry_strategy determines if and when the message should be retried. All strategies accept max_attempts and max_total_delay_seconds as common parameters — if either limit is reached, the message is marked as FAILED instead of RETRYABLE. Otherwise, the strategy schedules the message for a retry determined by the returned next_attempt_at.
ConstantRetryStrategy#
Retries after a fixed delay_seconds every time.
LinearRetryStrategy#
First retry after initial_delay_seconds, then the delay increases by step_seconds with each attempt.
ExponentialBackoffRetryStrategy#
Delay starts at initial_delay_seconds and is multiplied by multiplier on each attempt. max_delay_seconds caps the delay.
ExponentialBackoffWithJitterRetryStrategy#
Same as exponential backoff, but adds random jitter (up to delay * jitter_factor) to spread out retries and avoid thundering herds.
ConstantWithJitterRetryStrategy#
Retries after base_delay_seconds plus random jitter in the range [-jitter_seconds, +jitter_seconds].
NoRetryStrategy#
No retries — the message is marked as FAILED on the first Nack.
Transactional outbox#
Implementing the transactional outbox pattern becomes as simple as the following.
Publish messages transactionally with your other database operations.
And relay the messages from the database to another broker.