Swiftorial Logo
Home
Swift Lessons
Tutorials
Learn More
Career
Resources

AWS Asynchronous Messaging with SNS and SQS

Introduction to AWS Asynchronous Messaging with SNS and SQS

The AWS Asynchronous Messaging pattern leverages Amazon Simple Notification Service (SNS) for publish/subscribe (pub/sub) messaging and Amazon Simple Queue Service (SQS) for message queuing to enable decoupled, scalable, and resilient microservices architectures. SNS allows publishers to send messages to multiple subscribers via topics, while SQS provides durable message queues for reliable processing, retries, and background tasks. This pattern supports event-driven workflows, integrating seamlessly with AWS services like AWS Lambda, Amazon ECS, and AWS Step Functions, enabling use cases such as microservices communication, event notifications, and asynchronous task processing.

SNS and SQS together provide a robust framework for decoupling microservices and enabling asynchronous, fault-tolerant workflows.

Asynchronous Messaging Architecture Diagram

The diagram illustrates the asynchronous messaging workflow. Publishers (e.g., applications, IoT devices) send messages to an SNS topic, which fans out to multiple subscribers, including SQS queues, Lambda functions, and HTTP endpoints. SQS queues store messages for processing by consumers like Lambda, ECS tasks, or Step Functions, with dead-letter queues (DLQs) handling failed messages. CloudWatch monitors metrics and logs, while CI/CD pipelines manage configurations. Arrows are color-coded: blue for message publishing, green for subscription flows, orange for queue processing, and purple for monitoring.

graph TD %% Styling for nodes classDef publisher fill:#42a5f5,stroke:#1e88e5,stroke-width:2px,rx:5,ry:5; classDef sns fill:#ff6f61,stroke:#c62828,stroke-width:2px,color:#ffffff,rx:5,ry:5; classDef sqs fill:#2ecc71,stroke:#1b5e20,stroke-width:2px,color:#ffffff,rx:5,ry:5; classDef consumer fill:#fbc02d,stroke:#f9a825,stroke-width:2px,rx:5,ry:5; classDef monitoring fill:#9c27b0,stroke:#6a1b9a,stroke-width:2px,color:#ffffff,rx:5,ry:5; classDef storage fill:#8e44ad,stroke:#6a1b9a,stroke-width:2px,color:#ffffff,rx:5,ry:5; %% Nodes and edges A[Publishers - Apps / IoT / APIs] -->|Publish Messages| B[SNS Topic] B -->|Fan-Out| C[SQS Queue] B --> D[Lambda Subscriber] B --> E[HTTP Endpoint] C --> F[Lambda Consumer] C --> G[ECS Task] C --> H[Step Function] C --> I[SQS Dead-Letter Queue] F --> J[S3 Storage] F --> K[DynamoDB Table] B --> L[CloudWatch Monitoring] C --> L M[CI/CD Pipeline] --> B M --> C %% Subgraphs subgraph Event_Sources A end subgraph Messaging_Layer B C I end subgraph Subscribers D E end subgraph Processing_Layer F G H end subgraph Data_Stores J K end subgraph Monitoring L M end %% Class assignments class A publisher; class B sns; class C,I sqs; class D,F,G,H consumer; class J,K storage; class L,M monitoring; %% Link styles linkStyle 0 stroke:#405de6,stroke-width:2.5px; linkStyle 1,2,3 stroke:#ff6f61,stroke-width:2.5px; linkStyle 4,5,6 stroke:#2ecc71,stroke-width:2.5px; linkStyle 7 stroke:#fbc02d,stroke-width:2.5px; linkStyle 8,9 stroke:#2ecc71,stroke-width:2.5px; linkStyle 10,11 stroke:#9c27b0,stroke-width:2.5px;
SNS and SQS decouple publishers and consumers, enabling scalable and resilient asynchronous workflows.

Key Components

The SNS and SQS asynchronous messaging architecture relies on the following components:

  • Amazon SNS: Managed pub/sub messaging service for broadcasting messages to multiple subscribers via topics.
  • Amazon SQS: Managed message queuing service for storing and processing messages, supporting standard and FIFO queues.
  • SNS Topics: Channels for publishing messages, supporting subscriptions to SQS, Lambda, HTTP, or email.
  • SQS Queues: Durable queues for storing messages, with features like visibility timeouts and dead-letter queues.
  • Publishers: Applications, IoT devices, or APIs (e.g., running on EC2, Lambda) that send messages to SNS topics.
  • Consumers: Services like Lambda, ECS tasks, or Step Functions that process messages from SQS queues.
  • AWS Lambda: Serverless compute service for processing SNS or SQS messages in real time.
  • Amazon ECS: Container orchestration service for running consumer tasks that process SQS messages.
  • AWS Step Functions: Orchestrates complex workflows triggered by SQS messages for sequential or parallel processing.
  • Dead-Letter Queues (DLQs): SQS queues for storing unprocessed messages after retry attempts, enabling error analysis.
  • Amazon S3: Stores processed data or message outputs for archival or analytics.
  • Amazon DynamoDB: NoSQL database for low-latency storage of processed message data.
  • Amazon CloudWatch: Monitors SNS and SQS metrics (e.g., message throughput, queue depth) and logs.
  • AWS IAM: Secures access to SNS topics, SQS queues, and consumer services with least-privilege policies.
  • AWS VPC: Provides network isolation for ECS tasks or other consumers accessing SNS/SQS.
  • CI/CD Pipeline: Manages SNS/SQS configurations and consumer application deployments using AWS CodePipeline.

Benefits of AWS SNS and SQS Messaging Pattern

The SNS and SQS asynchronous messaging pattern offers significant advantages:

  • Decoupling: Separates publishers and consumers, enabling independent scaling and development.
  • Scalability: SNS and SQS scale automatically to handle high message volumes.
  • Reliability: SQS ensures message durability with at-least-once delivery and DLQs for error handling.
  • Flexibility: Supports multiple subscriber types (SQS, Lambda, HTTP) and queue types (standard, FIFO).
  • Cost Efficiency: Pay-per-use pricing minimizes costs for variable workloads.
  • Fault Tolerance: SQS retries and DLQs handle message processing failures gracefully.
  • Integration: Seamless connectivity with AWS services like Lambda, ECS, and Step Functions.
  • Simplified Management: Fully managed services reduce operational overhead for messaging infrastructure.
  • Observability: CloudWatch integration provides insights into message throughput, latency, and errors.
  • Security: IAM policies, encryption, and VPC endpoints ensure secure message transmission.

Implementation Considerations

Implementing the SNS and SQS messaging pattern requires addressing key considerations:

  • Queue Type Selection: Choose standard queues for high throughput or FIFO queues for ordered delivery.
  • Message Size: Ensure messages fit within SNS/SQS limits (256 KB); use S3 for larger payloads with reference pointers.
  • Security: Use IAM policies, KMS encryption, and VPC endpoints to secure SNS topics and SQS queues.
  • Retry and DLQ Setup: Configure visibility timeouts and DLQs to handle message processing failures.
  • Consumer Optimization: Batch process messages in Lambda or ECS to reduce costs and improve performance.
  • Monitoring: Enable CloudWatch metrics for queue depth, message latency, and consumer errors.
  • Cost Management: Use Cost Explorer to track SNS/SQS usage and optimize message batching.
  • Testing: Validate message flows with tools like AWS CLI or SDKs to ensure reliable delivery.
  • Scalability Planning: Configure auto-scaling for ECS consumers or Lambda concurrency limits for peak loads.
  • Compliance: Enable CloudTrail for auditing SNS/SQS operations to meet standards like SOC 2 or HIPAA.
  • Message Filtering: Use SNS message filtering to route specific messages to relevant subscribers.
  • Documentation: Maintain clear documentation for topic/queue configurations and consumer logic.
Proper queue configuration and security practices ensure reliable and secure asynchronous messaging.

Example Configuration: CloudFormation Template for SNS and SQS

Below is a CloudFormation template to provision an SNS topic, an SQS queue with a subscription, and a dead-letter queue.

AWSTemplateFormatVersion: '2010-09-09'
Description: Provisions an SNS topic, SQS queue with subscription, and dead-letter queue
Resources:
  MyDLQ:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: MyDeadLetterQueue
      Tags:
        - Key: Environment
          Value: production
  MySQSQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: MyMessageQueue
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt MyDLQ.Arn
        maxReceiveCount: 5
      Tags:
        - Key: Environment
          Value: production
  MySNSTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: MyMessageTopic
      Tags:
        - Key: Environment
          Value: production
  SQSSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      Protocol: sqs
      TopicArn: !Ref MySNSTopic
      Endpoint: !GetAtt MySQSQueue.Arn
      FilterPolicy:
        eventType:
          - order_created
          - payment_processed
  SQSQueuePolicy:
    Type: AWS::SQS::QueuePolicy
    Properties:
      Queues:
        - !Ref MySQSQueue
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: sns.amazonaws.com
            Action: sqs:SendMessage
            Resource: !GetAtt MySQSQueue.Arn
            Condition:
              ArnEquals:
                aws:SourceArn: !Ref MySNSTopic
Outputs:
  TopicArn:
    Value: !Ref MySNSTopic
  QueueUrl:
    Value: !Ref MySQSQueue
  DLQUrl:
    Value: !Ref MyDLQ
                
This CloudFormation template sets up an SNS topic, an SQS queue with a subscription, and a dead-letter queue for failed messages.

Example Configuration: AWS CDK in Python for SNS, SQS, and Lambda

Below is an AWS CDK stack in Python to provision an SNS topic, an SQS queue, and a Lambda consumer.

from aws_cdk import (
    core,
    aws_sns as sns,
    aws_sqs as sqs,
    aws_lambda as lambda_,
    aws_sns_subscriptions as subscriptions,
    aws_iam as iam
)

class SNSSQSStack(core.Stack):
    def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        # Define SQS dead-letter queue
        dlq = sqs.Queue(
            self, "MyDeadLetterQueue",
            queue_name="MyDeadLetterQueue",
            retention_period=core.Duration.days(14)
        )

        # Define SQS queue with DLQ
        queue = sqs.Queue(
            self, "MyMessageQueue",
            queue_name="MyMessageQueue",
            visibility_timeout=core.Duration.seconds(30),
            dead_letter_queue=sqs.DeadLetterQueue(
                max_receive_count=5,
                queue=dlq
            )
        )

        # Define SNS topic
        topic = sns.Topic(
            self, "MyMessageTopic",
            topic_name="MyMessageTopic"
        )

        # Subscribe SQS to SNS
        topic.add_subscription(
            subscriptions.SqsSubscription(
                queue,
                filter_policy={
                    "eventType": sns.SubscriptionFilter.string_filter(
                        allowlist=["order_created", "payment_processed"]
                    )
                }
            )
        )

        # Define Lambda consumer
        lambda_consumer = lambda_.Function(
            self, "SQSConsumerLambda",
            runtime=lambda_.Runtime.PYTHON_3_8,
            handler="lambda_function.lambda_handler",
            code=lambda_.Code.from_inline("""
import json

def lambda_handler(event, context):
    for record in event['Records']:
        print("Received SQS message:", json.loads(record['body']))
    return {"statusCode": 200}
"""),
            timeout=core.Duration.seconds(30),
            environment={
                "QUEUE_URL": queue.queue_url
            }
        )

        # Grant Lambda permissions to read from SQS
        queue.grant_consume_messages(lambda_consumer)

        # Grant SNS permissions to write to SQS
        queue.add_to_resource_policy(
            iam.PolicyStatement(
                effect=iam.Effect.ALLOW,
                principals=[iam.ServicePrincipal("sns.amazonaws.com")],
                actions=["sqs:SendMessage"],
                resources=[queue.queue_arn],
                conditions={
                    "ArnEquals": {"aws:SourceArn": topic.topic_arn}
                }
            )
        )

        # Add tags
        core.Tags.of(queue).add("Environment", "production")
        core.Tags.of(topic).add("Environment", "production")
        core.Tags.of(dlq).add("Environment", "production")

app = core.App()
SNSSQSStack(app, "SNSSQSStack")
app.synth()
                
This CDK stack provisions an SNS topic, an SQS queue with a DLQ, and a Lambda consumer for processing messages.

Example Configuration: Terraform for SNS, SQS, and ECS

Below is a Terraform configuration to provision an SNS topic, an SQS queue, and an ECS task for message processing.

provider "aws" {
  region = "us-west-2"
}

resource "aws_sns_topic" "message_topic" {
  name = "MyMessageTopic"
  tags = {
    Environment = "production"
  }
}

resource "aws_sqs_queue" "dead_letter_queue" {
  name = "MyDeadLetterQueue"
  tags = {
    Environment = "production"
  }
}

resource "aws_sqs_queue" "message_queue" {
  name = "MyMessageQueue"
  visibility_timeout_seconds = 30
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dead_letter_queue.arn
    maxReceiveCount     = 5
  })
  tags = {
    Environment = "production"
  }
}

resource "aws_sns_topic_subscription" "sqs_subscription" {
  topic_arn = aws_sns_topic.message_topic.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.message_queue.arn
  filter_policy = jsonencode({
    eventType = ["order_created", "payment_processed"]
  })
}

resource "aws_sqs_queue_policy" "queue_policy" {
  queue_url = aws_sqs_queue.message_queue.id
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "sns.amazonaws.com"
        }
        Action = "sqs:SendMessage"
        Resource = aws_sqs_queue.message_queue.arn
        Condition = {
          ArnEquals = {
            "aws:SourceArn" = aws_sns_topic.message_topic.arn
          }
        }
      }
    ]
  })
}

resource "aws_ecs_cluster" "consumer_cluster" {
  name = "MessageConsumerCluster"
  tags = {
    Environment = "production"
  }
}

resource "aws_ecs_task_definition" "consumer_task" {
  family = "MessageConsumerTask"
  network_mode = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu = "256"
  memory = "512"
  execution_role_arn = aws_iam_role.ecs_execution_role.arn
  container_definitions = jsonencode([
    {
      name = "consumer-container"
      image = "amazon/amazon-ecs-sample"
      essential = true
      environment = [
        {
          name = "QUEUE_URL"
          value = aws_sqs_queue.message_queue.id
        }
      ]
      logConfiguration = {
        logDriver = "awslogs"
        options = {
          awslogs-group = "/ecs/message-consumer"
          awslogs-region = "us-west-2"
          awslogs-stream-prefix = "consumer"
        }
      }
    }
  ])
  tags = {
    Environment = "production"
  }
}

resource "aws_iam_role" "ecs_execution_role" {
  name = "ecs-execution-role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "ecs-tasks.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "ecs_sqs_policy" {
  name = "ecs-sqs-policy"
  role = aws_iam_role.ecs_execution_role.id
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
        Resource = aws_sqs_queue.message_queue.arn
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "*"
      }
    ]
  })
}
                
This Terraform configuration provisions an SNS topic, an SQS queue with a DLQ, and an ECS task for processing messages.