Documentation Index
Fetch the complete documentation index at: https://mintlify.com/alphagov/notifications-api/llms.txt
Use this file to discover all available pages before exploring further.
GOV.UK Notify uses Celery with Amazon SQS as the message broker for asynchronous task processing. The task architecture handles notification delivery, job processing, scheduled tasks, and callbacks.
Queue Architecture
Location: app/config.py:12
Queue Definitions
class QueueNames:
PERIODIC = "periodic-tasks" # Scheduled/cron tasks
DATABASE = "database-tasks" # DB writes from jobs
SEND_SMS = "send-sms-tasks" # SMS delivery
SEND_EMAIL = "send-email-tasks" # Email delivery
SEND_LETTER = "send-letter-tasks" # Letter delivery
RESEARCH_MODE = "research-mode-tasks" # Test mode sends
REPORTING = "reporting-tasks" # Analytics/billing
JOBS = "job-tasks" # Job processing
RETRY = "retry-tasks" # Failed task retries
NOTIFY = "notify-internal-tasks" # Internal operations
CREATE_LETTERS_PDF = "create-letters-pdf-tasks"
CALLBACKS = "service-callbacks" # Service callbacks
CALLBACKS_RETRY = "service-callbacks-retry"
LETTERS = "letter-tasks" # Letter operations
SES_CALLBACKS = "ses-callbacks" # Email provider callbacks
SMS_CALLBACKS = "sms-callbacks" # SMS provider callbacks
LETTER_CALLBACKS = "letter-callbacks" # Letter provider callbacks
ANTIVIRUS = "antivirus-tasks" # External AV service
SANITISE_LETTERS = "sanitise-letter-tasks" # External sanitisation
Queue Configuration
CELERY = {
"broker_url": "https://sqs.eu-west-1.amazonaws.com",
"broker_transport": "sqs",
"task_ignore_result": True,
"broker_transport_options": {
"region": "eu-west-1",
"queue_name_prefix": NOTIFICATION_QUEUE_PREFIX,
"predefined_queues": QueueNames.predefined_queues(),
},
}
Core Tasks
Job Processing Tasks
Location: app/celery/tasks.py
process-job
@notify_celery.task(name="process-job")
def process_job(job_id, sender_id=None, shatter_batch_size=32):
"""
Main task to process a bulk notification job.
Flow:
1. Fetch job and validate status
2. Check service is active
3. Verify sending limits not exceeded
4. Read CSV from S3
5. Batch rows and create shatter-job-rows tasks
6. Mark job as finished
"""
Process:
- Updates job status to
in progress
- Validates service is active
- Checks daily sending limits
- Reads recipient CSV from S3
- Batches rows (default 32) and creates subtasks
- Each batch sent to
shatter-job-rows task
- Marks job as
finished
shatter-job-rows
@notify_celery.task(name="shatter-job-rows")
def shatter_job_rows(template_type: str, args_kwargs_seq: Sequence):
"""
Takes a batch of job rows and creates individual notification tasks.
Uses subdivision if SQS message size limit exceeded.
"""
for task_args_kwargs in args_kwargs_seq:
process_job_row(template_type, task_args_kwargs)
Features:
- Handles SQS message size limits via recursive subdivision
- Routes to appropriate save task based on notification type
- Queue:
JOBS
Notification Saving Tasks
Location: app/celery/tasks.py:275
save-sms
@notify_celery.task(
bind=True,
name="save-sms",
max_retries=5,
default_retry_delay=300
)
def save_sms(self, service_id, notification_id, encoded_notification, sender_id=None):
"""
Persists SMS notification to database and queues for delivery.
"""
Flow:
- Decode signed notification data
- Load service and template
- Validate recipient format
- Persist notification to database
- Queue
deliver_sms task to SEND_SMS
Error Handling:
- Invalid phone numbers marked as
validation-failed
- SQLAlchemy errors trigger retry with exponential backoff
- Max retries: 5, delay: 300s
save-email
@notify_celery.task(
bind=True,
name="save-email",
max_retries=5,
default_retry_delay=300
)
def save_email(self, service_id, notification_id, encoded_notification, sender_id=None):
"""
Persists email notification and queues for delivery.
"""
Special Handling:
- Adds email file attachment links to personalisation
- Handles reply-to address configuration
- Unsubscribe link generation if enabled
save-letter
@notify_celery.task(
bind=True,
name="save-letter",
max_retries=5,
default_retry_delay=300
)
def save_letter(self, service_id, notification_id, encoded_notification):
"""
Persists letter notification and triggers PDF generation.
"""
Flow:
- Parse postal address from personalisation
- Determine postage (first/second class, international)
- Persist notification
- Queue
get_pdf_for_templated_letter to CREATE_LETTERS_PDF
Provider Delivery Tasks
Location: app/celery/provider_tasks.py
deliver_sms
@notify_celery.task(
bind=True,
name="deliver_sms",
max_retries=48,
default_retry_delay=300
)
def deliver_sms(self, notification_id):
"""
Sends SMS to provider (MMG or Firetext).
Retries: 48 times (4 days)
"""
Error Handling:
SmsClientResponseException → retry
- Max retries exceeded → mark as
technical-failure
- First retry: immediate (countdown=0)
- Subsequent retries: 300s delay
- Queue:
RETRY for all retries
deliver_email
@notify_celery.task(
bind=True,
name="deliver_email",
max_retries=48,
default_retry_delay=300
)
def deliver_email(self, notification_id):
"""
Sends email via AWS SES.
"""
Special Cases:
EmailClientNonRetryableException → immediate technical-failure (no retry)
AwsSesClientThrottlingSendRateException → retry with warning
- Generic exceptions → retry
deliver_letter
@notify_celery.task(
bind=True,
name="deliver_letter",
max_retries=55
)
def deliver_letter(self, notification_id):
"""
Sends letter to DVLA print provider.
"""
Process:
- Validate notification status is
created
- Fetch PDF from S3
- Send to DVLA API with callback URL
- Update status to
sending
Error Types:
DvlaThrottlingException → retry with warning
DvlaRetryableException → retry
DvlaDuplicatePrintRequestException → mark sending (idempotent)
- Other exceptions →
technical-failure
Scheduled Tasks
Location: app/config.py:282 (beat_schedule)
High-Frequency Tasks (Every Minute)
"generate-sms-delivery-stats": {
"task": "generate-sms-delivery-stats",
"schedule": crontab(), # Every minute
},
"switch-current-sms-provider-on-slow-delivery": {
"task": "switch-current-sms-provider-on-slow-delivery",
"schedule": crontab(),
},
"check-job-status": {
"task": "check-job-status",
"schedule": crontab(),
},
Periodic Tasks
"run-scheduled-jobs": {
"task": "run-scheduled-jobs",
"schedule": crontab(minute="0,15,30,45"), # Every 15 mins
},
"tend-providers-back-to-middle": {
"task": "tend-providers-back-to-middle",
"schedule": crontab(minute="*/5"), # Every 5 mins
},
"replay-created-notifications": {
"task": "replay-created-notifications",
"schedule": crontab(minute="0,15,30,45"),
},
Nightly Tasks
"timeout-sending-notifications": {
"task": "timeout-sending-notifications",
"schedule": crontab(hour=0, minute=5),
},
"create-nightly-billing": {
"task": "create-nightly-billing",
"schedule": crontab(hour=0, minute=15),
},
"create-nightly-notification-status": {
"task": "create-nightly-notification-status",
"schedule": crontab(hour=0, minute=30),
},
"delete-notifications-older-than-retention": {
"task": "delete-notifications-older-than-retention",
"schedule": crontab(hour=3, minute=0),
},
Letter Collation
"check-time-to-collate-letters": {
"task": "check-time-to-collate-letters",
"schedule": crontab(hour="16,17", minute=50), # 4:50pm and 5:50pm UTC
# Task checks if local time is 5:50pm BST before proceeding
},
Weekly/Monthly Tasks
"weekly-dwp-report": {
"task": "weekly-dwp-report",
"schedule": crontab(hour=9, minute=0, day_of_week="mon"),
},
"change-dvla-api-key": {
"task": "change-dvla-api-key",
"schedule": crontab(hour=9, minute=0, day_of_week="tue", day_of_month="1-7"),
# First Tuesday of every month
},
Task Design Patterns
Retry Strategy
@notify_celery.task(bind=True, max_retries=48, default_retry_delay=300)
def my_task(self, notification_id):
try:
do_work(notification_id)
except Exception as e:
if self.request.retries == 0:
self.retry(queue=QueueNames.RETRY, countdown=0) # Immediate
else:
self.retry(queue=QueueNames.RETRY) # 300s delay
Error Handling Hierarchy
- Non-retryable → Immediate technical-failure
- Retryable → Exponential backoff
- Max retries → Technical-failure with logging
Idempotency
All tasks check for duplicate processing:
if not get_notification_by_id(notification_id):
# Already processed, safe to skip
return
Task Signing
Sensitive data encoded with signing module:
encoded = signing.encode({
"template": str(template_id),
"to": recipient,
"personalisation": {...}
})
Batching
- Job rows batched (default 32) before task creation
- Reduces SQS API calls
- Subdivision if message too large
Queue Separation
- Delivery tasks separated by channel (SMS/Email/Letter)
- Prevents head-of-line blocking
- Independent scaling per queue
Prefetch Multiplier
if os.getenv("CELERYD_PREFETCH_MULTIPLIER"):
CELERY["worker_prefetch_multiplier"] = os.getenv("CELERYD_PREFETCH_MULTIPLIER")
Set to 1 for long-running tasks to prevent timeout issues.
Monitoring
Tasks emit StatsD metrics:
self.statsd_client.incr(f"clients.{provider}.success")
self.statsd_client.timing(f"clients.{provider}.request-time", elapsed)
Logging with structured context:
current_app.logger.info(
"SMS notification %(notification_id)s created",
extra={"notification_id": notification_id}
)
app/celery/tasks.py - Job and notification tasks
app/celery/provider_tasks.py - Provider delivery tasks
app/celery/scheduled_tasks.py - Scheduled/periodic tasks
app/celery/nightly_tasks.py - Nightly maintenance tasks
app/celery/reporting_tasks.py - Analytics tasks
app/config.py - Queue and schedule configuration