Skip to content

Acknowledgment#

Due to the possibility of unexpected errors during message processing, FastStream provides an ack_policy parameter that allows users to control how messages are handled. This parameter determines when and how messages should be acknowledged or rejected based on the result of the message processing.

AckPolicy#

AckPolicy is an enumerated type (Enum) in FastStream that specifies the message acknowledgment strategy. It determines how the system responds after receiving and processing a message.

Availability#

AckPolicy is supported by the following brokers:

Usage#

You must specify the ack_policy parameter when creating a subscriber:

from faststream import FastStream, Logger, AckPolicy
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber(
    "test",
    ack_policy=AckPolicy.REJECT_ON_ERROR,
)
async def handler(msg: str, logger: Logger) -> None:
    logger.info(msg)

Available Options#

Policy Description On Success On Error
ACK_FIRST ACK upon receipt, prior to processing ACK ACK
ACK ACK after processing, regardless of exceptions ACK ACK
REJECT_ON_ERROR ACK on successful processing, REJECT on exception (no redelivery) ACK REJECT
NACK_ON_ERROR ACK on successful processing, NACK on exception (with redelivery) ACK NACK
MANUAL No automatic acknowledgement. User must manually handle the completion via message methods
  • msg.ack()
  • msg.nack()
  • msg.reject()

Broker Behaviors#

Here is how FastStream's ACK / NACK / REJECT commands map to brokers' behaviors:

Broker ACK NACK REJECT
RabbitMQ Protocol ack Protocol nack Protocol reject
NATS JetStream Protocol ack Protocol nak Protocol term
Redis Streams Xack call Do nothing Do nothing
Kafka Commits offset Seek offset and read message again Commits offset (same as ACK)
SQLA Mark msg as COMPLETED Mark msg as RETRYABLE if retry strategy allows and FAILED otherwise Mark msg as FAILED

When to Use#

  • Use ACK_FIRST for scenarios with high throughput where some message loss can be acceptable.
  • Use ACK if you want the message to be acknowledged, regardless of success or failure.
  • Use REJECT_ON_ERROR to permanently discard messages on failure.
  • Use NACK_ON_ERROR to retry messages in case of failure.
  • Use MANUAL to fully manually control message acknowledgment (for example, calling message.ack() yourself).

Extended Examples#

Automatic Retry on Failure#

from faststream import FastStream, AckPolicy, Logger
from faststream.rabbitmq import RabbitBroker

broker = RabbitBroker()
app = FastStream(broker)

@broker.subscriber("orders", ack_policy=AckPolicy.NACK_ON_ERROR)
async def process_order(msg: str, logger: Logger) -> None:
    logger.info(f"Processing: {msg}")
    raise Exception # Message will be nacked and redelivered

Manual Acknowledgment Handling#

from faststream import FastStream, AckPolicy, Logger
from faststream.kafka import KafkaBroker

broker = KafkaBroker()
app = FastStream(broker)

@broker.subscriber("events", ack_policy=AckPolicy.MANUAL)
async def handle_event(msg: str) -> None:
    try:
        # do_smth(msg)
    except Exception:
        await msg.nack()  # or msg.reject()
    else:
        await msg.ack()

You can also manage manual acknowledgement using middleware. For more information, error handling middleware documentation.