← Back to Blog
April 17, 2026 · 7 min read · Infrastructure & Architecture

Building Idempotent Data Pipelines on AWS

Reprocessing the same file creates duplicate records. A network timeout triggers a retry that processes data twice. A Lambda function gets invoked twice by the same event. These aren’t edge cases. They’re your pipeline’s default behavior.

Idempotency—the ability to safely replay operations without side effects—is the difference between a pipeline and a nightmare.

What Idempotency Really Means

Idempotency means: running the same operation once, twice, or a hundred times produces the same result. The operation might execute multiple times, but the state changes only once.

In data pipelines, this means:

  • Processing the same invoice twice doesn’t create two line-item rows
  • Retrying a failed Lambda invocation doesn’t double-count metrics
  • Reloading last month’s data doesn’t corrupt aggregate reports

Without idempotency, every retry, every network hiccup, every “let me run this again” risks corrupting your data lake.

Content-Based Deduplication: The Foundation

Start with deterministic identification. Use content hashing to generate unique IDs for incoming data.

import hashlib import json def generate_content_id(data: dict) -> str: """Generate a deterministic ID from data content.""" # Sort keys to ensure consistent hashing content = json.dumps(data, sort_keys=True) return hashlib.sha256(content.encode()).hexdigest() # Example: invoice file invoice_data = { "invoice_number": "INV-2024-001", "date": "2024-04-01", "amount": 1500.00, "vendor": "Acme Corp" } content_id = generate_content_id(invoice_data) # Same data always produces the same ID

This ID becomes your record’s unique identifier. It’s based entirely on content. Same invoice → same ID. Different invoice → different ID.

DynamoDB Conditional Writes: Enforcing Uniqueness

Once you have a content ID, use DynamoDB’s conditional writes to guarantee exactly-once insertion.

import boto3 from botocore.exceptions import ClientError dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('invoices') def insert_idempotent(content_id: str, data: dict) -> bool: """Insert data only if content_id doesn't already exist.""" try: table.put_item( Item={ 'content_id': content_id, 'invoice_number': data['invoice_number'], 'amount': data['amount'], 'timestamp': int(time.time()), 'status': 'processed' }, ConditionExpression='attribute_not_exists(content_id)' ) return True # First time seeing this content except ClientError as e: if e.response['Error']['Code'] == 'ConditionalCheckFailedException': return False # Already processed this content raise

The ConditionExpression ensures the write only succeeds if the content_id doesn’t exist. If it does, the operation fails—but safely. You can detect this and handle it gracefully.

Key insight: DynamoDB conditional writes are atomic. There’s no race condition between checking and writing. Two Lambda invocations processing the same event simultaneously? Only one wins. The other gets a ConditionalCheckFailedException—which is exactly what you want.

Exactly-Once Processing with Distributed IDs

For high-volume pipelines, content hashing can be expensive. Use a distributed ID scheme instead:

  • For files: {bucket}/{prefix}/{filename}/{hash(file_content)}
  • For API events: {source}/{event_type}/{timestamp}/{sequence_number}
  • For time-series data: {metric_name}/{dimension_key}/{timestamp_hour}

Store these IDs in a fast lookup table (DynamoDB with TTL) to detect replays:

def has_been_processed(tracking_id: str) -> bool: """Check if we've already processed this tracking_id.""" try: response = dynamodb.Table('processing_tracking').get_item( Key={'tracking_id': tracking_id} ) return 'Item' in response except: return False def mark_as_processed(tracking_id: str): """Mark this tracking_id as processed. Auto-expire after 30 days.""" dynamodb.Table('processing_tracking').put_item( Item={ 'tracking_id': tracking_id, 'processed_at': int(time.time()), 'ttl': int(time.time()) + (30 * 86400) # 30 days } )

Lambda Patterns for Idempotent Processing

Structure your Lambda functions to separate idempotency from business logic:

def lambda_handler(event, context): tracking_id = event['message_id'] # Unique per invocation # Check if already processed if has_been_processed(tracking_id): return {'statusCode': 200, 'message': 'Already processed'} try: # Do the actual work result = process_data(event) # Persist result atomically store_result(tracking_id, result) # Mark as processed mark_as_processed(tracking_id) return {'statusCode': 200, 'data': result} except Exception as e: # Don't mark as processed on failure # Next invocation will retry raise

Key insight: Only mark something as processed after it’s successfully stored. If processing fails, the next retry can try again.

S3 Event Notifications: A Practical Example

Here’s a complete pipeline for processing files uploaded to S3:

  1. File arrives → S3 PUT event fires
  2. Lambda is invoked → receives S3 event notification
  3. Generate content ID from file metadata (bucket, key, etag)
  4. Check tracking table → have we seen this before?
  5. If yes → return immediately (idempotent success)
  6. If no → process file, store results to DynamoDB
  7. Mark as processed → record content ID with TTL
def lambda_handler(event, context): s3_event = event['Records'][0]['s3'] bucket = s3_event['bucket']['name'] key = s3_event['object']['key'] etag = s3_event['object']['eTag'] # Deterministic tracking ID tracking_id = f"{bucket}#{key}#{etag}" if has_been_processed(tracking_id): return 'Skipped (already processed)' # Process file data = process_s3_file(bucket, key) store_results(data) mark_as_processed(tracking_id) return 'Processed successfully'

Even if S3 sends the same event twice (and it does), your Lambda handles it gracefully.

Testing Idempotency

Before deploying, test it:

def test_idempotent_processing(): """Verify processing twice gives same result.""" tracking_id = 'test-123' data = {'value': 100} # First invocation result1 = process_and_track(tracking_id, data) # Second invocation (replay) result2 = process_and_track(tracking_id, data) assert result1 == result2 # Results are identical # Verify only one record exists count = count_records_with_id(tracking_id) assert count == 1

Why This Matters

Idempotency isn’t a feature. It’s a prerequisite. In distributed systems, failures are guaranteed. Retries are guaranteed. The only question is: does your pipeline handle them?

Without idempotency, every retry risks corrupting your data. With it, retries are safe. Recovery is automatic.