Documentation
Symfony
Symfony Messenger

Symfony Messenger

Symfony Messenger messages can be dispatched to SQS, SNS, or EventBridge, while workers handle those messages on AWS Lambda.

Installation

This guide assumes that:

First, install the Bref-Symfony messenger integration (opens in a new tab):

composer require bref/symfony-messenger

Next, register the bundle in config/bundles.php:

config/bundles.php
return [
    // ...
    Bref\Symfony\Messenger\BrefMessengerBundle::class => ['all' => true],
];

SQS, SNS, and EventBridge can now be used with Symfony Messenger.

Usage

Symfony Messenger dispatches messages. To create a message, follow the Symfony Messenger documentation (opens in a new tab).

To configure where messages are dispatched, all the examples in this documentation are based on the example from the Symfony documentation (opens in a new tab):

config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: '%env(MESSENGER_TRANSPORT_DSN)%'
        routing:
             'App\Message\MyMessage': async

SQS

The SQS (opens in a new tab) service is a queue that is similar to RabbitMQ. To use it, set its URL in the environment variable MESSENGER_TRANSPORT_DSN:

serverless.yml
provider:
    name: aws
    environment:
        MESSENGER_TRANSPORT_DSN: https://sqs.us-east-1.amazonaws.com/123456789/my-queue

The implementation uses the SQS transport provided by Symfony Amazon SQS Messenger (opens in a new tab), so all its features are supported. If you already use that transport, the transition to AWS Lambda should not require any change for dispatching messages.

However, instead of creating the SQS queue and the worker manually, you can use the Serverless Lift (opens in a new tab) plugin.

First install the Lift plugin:

serverless plugin install -n serverless-lift

Then use the Queue construct (opens in a new tab) in serverless.yml to create a queue and a worker:

serverless.yml
provider:
    # ...
    environment:
        # ...
        MESSENGER_TRANSPORT_DSN: ${construct:jobs.queueUrl}
 
functions:
    # ...
 
constructs:
    jobs:
        type: queue
        worker:
            handler: bin/consumer.php
            runtime: php-81
            timeout: 60 # in seconds

You will want to disable auto_setup to avoid extra SQS requests and permission issues.

config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    auto_setup: false

With that configuration, anytime a message is pushed to Symfony Messenger, it will be sent to SQS, and SQS will invoke our "worker" Lambda function so that it is processed.

💡

With Lift, AWS credentials (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) are automatically set up with the appropriate permissions for Messenger to use the SQS queue.

We now need to create the handler script (bin/consumer.php):

bin/consumer.php
<?php declare(strict_types=1);
 
use Bref\Symfony\Messenger\Service\Sqs\SqsConsumer;
use Symfony\Component\Dotenv\Dotenv;
 
require dirname(__DIR__).'/vendor/autoload.php';
 
(new Dotenv())->bootEnv(dirname(__DIR__).'/.env');
 
$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool)$_SERVER['APP_DEBUG']);
$kernel->boot();
 
// Return the Bref consumer service
return $kernel->getContainer()->get(SqsConsumer::class);

Finally, register and configure the SqsConsumer service:

config/services.yaml
services:
    Bref\Symfony\Messenger\Service\Sqs\SqsConsumer:
        public: true
        autowire: true
        arguments:
            # Pass the transport name used in config/packages/messenger.yaml
            $transportName: 'async'
            $partialBatchFailure: true

Error handling

AWS Lambda has error handling mechanisms (retrying and handling failed messages). Because of that, this package does not integrate Symfony Messenger's retry mechanism. Instead, it works with Lambda's retry mechanism.

With the default Lift configuration, failed messages will be retried 3 times. You can configure this, learn more (opens in a new tab).

When using SNS and EventBridge, messages will be retried by default 2 times.

FIFO queue

FIFO queues (opens in a new tab) guarantee exactly once delivery, and have a mandatory queue name suffix .fifo.

With Lift, set fifo: true (opens in a new tab) to enable it:

serverless.yml
constructs:
    my-queue:
        # ...
        fifo: true

Symfony Amazon SQS Messenger (opens in a new tab) will automatically calculate/set the MessageGroupId and MessageDeduplicationId parameters required for FIFO queues, but you can set them explicitly:

use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;
 
/* @var MessageBus $messageBus */
$messageBus->dispatch(new MyAsyncMessage(), [
    new AmazonSqsFifoStamp('my-group-message-id', 'my-deduplication-id'),
]);

Everything else is identical to the normal SQS queue.

SNS

AWS SNS (opens in a new tab) is "notification" instead of "queues". Messages may not arrive in the same order as sent, and they might arrive all at once. To use it, create an SNS topic and set it as the DSN:

MESSENGER_TRANSPORT_DSN=sns://arn:aws:sns:us-east-1:1234567890:foobar

That's it, messages will be dispatched to that topic.

💡

When running Symfony on AWS Lambda, it is not necessary to configure credentials. The AWS client will read them from environment variables (opens in a new tab) automatically.

To consume messages from SNS:

  1. Create the function that will be invoked by SNS in serverless.yml:
serverless.yml
functions:
    worker:
        handler: bin/consumer.php
        timeout: 20 # in seconds
        runtime: php-81
        events:
            # Read more at https://www.serverless.com/framework/docs/providers/aws/events/sns/
            - sns:
                arn: arn:aws:sns:us-east-1:1234567890:my_sns_topic
  1. Create the handler script (for example bin/consumer.php):
bin/consumer.php
<?php declare(strict_types=1);
 
use Bref\Symfony\Messenger\Service\Sns\SnsConsumer;
use Symfony\Component\Dotenv\Dotenv;
 
require dirname(__DIR__).'/vendor/autoload.php';
 
(new Dotenv())->bootEnv(dirname(__DIR__).'/.env');
 
$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();
 
// Return the Bref consumer service
return $kernel->getContainer()->get(SnsConsumer::class);
  1. Register and configure the SnsConsumer service:
config/services.yaml
services:
    Bref\Symfony\Messenger\Service\Sns\SnsConsumer:
        public: true
        autowire: true
        arguments:
            # Pass the transport name used in config/packages/messenger.yaml
            $transportName: 'async'

Now, anytime a message is dispatched to SNS, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed.

Error handling

AWS Lambda has error handling mechanisms (retrying and handling failed messages). Because of that, this package does not integrate Symfony Messenger's retry mechanism. Instead, it works with Lambda's retry mechanism.

By default, Lambda will retry failed messages 2 times.

EventBridge

AWS EventBridge (opens in a new tab) is a message routing service. It is similar to SNS, but more powerful for communication between microservices.

To use it, configure the DSN like so:

# "myapp" is the EventBridge "source", i.e. a namespace for your application's messages
# This source name will be reused in `serverless.yml` later.
MESSENGER_TRANSPORT_DSN=eventbridge://myapp

Optionally you can add set the EventBusName (opens in a new tab) via a event_bus_name query parameter, either the name or the ARN:

MESSENGER_TRANSPORT_DSN=eventbridge://myapp?event_bus_name=custom-bus
MESSENGER_TRANSPORT_DSN=eventbridge://myapp?event_bus_name=arn:aws:events:us-east-1:123456780912:event-bus/custom-bus

That's it, messages will be dispatched to EventBridge.

💡

When running Symfony on AWS Lambda, it is not necessary to configure credentials. The AWS client will read them from environment variables (opens in a new tab) automatically.

To consume messages from EventBridge:

  1. Create the function that will be invoked by EventBridge in serverless.yml:
serverless.yml
functions:
    worker:
        handler: bin/consumer.php
        timeout: 20 # in seconds
        runtime: php-81
        events:
            # Read more at https://www.serverless.com/framework/docs/providers/aws/events/event-bridge/
            -   eventBridge:
                    # In case of you change bus name in config/packages/messenger.yaml (i.e eventbridge://myapp?event_bus_name=custom-bus) you need to set bus name like below
                    # eventBus: custom-bus
                    # This filters events we listen to: only events from the "myapp" source.
                    # This should be the same source defined in config/packages/messenger.yaml
                    pattern:
                        source:
                            - myapp
  1. Create the handler script (for example bin/consumer.php):
bin/consumer.php
<?php declare(strict_types=1);
 
use Bref\Symfony\Messenger\Service\EventBridge\EventBridgeConsumer;
use Symfony\Component\Dotenv\Dotenv;
 
require dirname(__DIR__).'/vendor/autoload.php';
 
(new Dotenv())->bootEnv(dirname(__DIR__).'/.env');
 
$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();
 
// Return the Bref consumer service
return $kernel->getContainer()->get(EventBridgeConsumer::class);
  1. Register and configure the EventBridgeConsumer service:
config/services.yaml
services:
    Bref\Symfony\Messenger\Service\EventBridge\EventBridgeConsumer:
        public: true
        autowire: true
        arguments:
            # Pass the transport name used in config/packages/messenger.yaml
            $transportName: 'async'
            # Optionnally, if you have different buses in config/packages/messenger.yaml, set $bus like below:
            # $bus: '@event.bus'

Now, anytime a message is dispatched to EventBridge for that source, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed.

Error handling

AWS Lambda has error handling mechanisms (retrying and handling failed messages). Because of that, this package does not integrate Symfony Messenger's retry mechanism. Instead, it works with Lambda's retry mechanism.

By default, Lambda will retry failed messages 2 times.

Configuration

Configuring AWS clients

By default, AWS clients (SQS, SNS, EventBridge) are preconfigured to work on AWS Lambda (thanks to environment variables populated by AWS Lambda (opens in a new tab)).

However, it is possible customize the AWS clients, for example to use them outside of AWS Lambda (locally, on EC2…) or to mock them in tests. These clients are registered as Symfony services under the keys:

  • bref.messenger.sqs_client
  • bref.messenger.sns_client
  • bref.messenger.eventbridge_client

For example to customize the SQS client:

services:
    bref.messenger.sqs_client:
        class: AsyncAws\Sqs\SqsClient
        public: true # the AWS clients must be public
        arguments:
            # Apply your own config here
            -
                region: us-east-1

Disabling transports

By default, this package registers Symfony Messenger transports for SQS, SNS and EventBridge.

If you want to disable some transports (for example in case of conflict), you can remove BrefMessengerBundle from config/bundles.php and reconfigure the transports you want in your application's config. Take a look at Resources/config/services.yaml (opens in a new tab) to copy the part that you want.

Customizing the serializer

If you want to change how messages are serialized, for example to use Happyr message serializer (opens in a new tab), you need to add the serializer on both the transport and the consumer. For example:

# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async:
                dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
                serializer: 'Happyr\MessageSerializer\Serializer'
 
# config/services.yaml
services:
    Bref\Symfony\Messenger\Service\Sqs\SqsConsumer:
        public: true
        autowire: true
        arguments:
            $transportName: 'async'
            $serializer: '@Happyr\MessageSerializer\Serializer'