Synchronous operations block responses. Celery moves time-consuming tasks to background workers, keeping Django applications responsive. At ZIRA Software, Celery powers background processing for healthcare platforms handling thousands of daily jobs.
Why Celery?
Without Celery:
- Long-running tasks block HTTP responses
- Email sending delays page loads
- Report generation times out
- No retry mechanism
With Celery:
- Immediate HTTP responses
- Background processing
- Automatic retries
- Scheduled periodic tasks
Installation
pip install celery redis
Configure Celery:
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
Django settings:
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
Creating Tasks
# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import Order
@shared_task
def send_order_confirmation(order_id):
"""Send order confirmation email"""
order = Order.objects.get(id=order_id)
send_mail(
subject=f'Order Confirmation #{order.id}',
message=f'Thank you for your order, {order.customer.name}!',
from_email='contact@zirasoftware.com',
recipient_list=[order.customer.email],
)
return f'Email sent to {order.customer.email}'
@shared_task
def generate_report(report_id):
"""Generate PDF report"""
from .models import Report
report = Report.objects.get(id=report_id)
report.status = 'processing'
report.save()
try:
# Generate PDF (expensive operation)
pdf_content = generate_pdf(report.data)
report.file.save(f'report_{report_id}.pdf', pdf_content)
report.status = 'completed'
report.save()
except Exception as e:
report.status = 'failed'
report.error_message = str(e)
report.save()
raise
Calling Tasks
Synchronous (regular function):
result = send_order_confirmation(order.id) # Blocks
Asynchronous (Celery task):
# Fire and forget
send_order_confirmation.delay(order.id)
# With options
send_order_confirmation.apply_async(
args=[order.id],
countdown=60, # Delay 60 seconds
expires=3600, # Expire after 1 hour
)
Task Retries
@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id):
try:
order = Order.objects.get(id=order_id)
charge_customer(order)
except PaymentGatewayError as exc:
# Retry after 5 minutes
raise self.retry(exc=exc, countdown=300)
Periodic Tasks
Install django-celery-beat:
pip install django-celery-beat
# settings.py
INSTALLED_APPS = [
# ...
'django_celery_beat',
]
# Periodic task schedule
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'send-daily-report': {
'task': 'myapp.tasks.send_daily_report',
'schedule': crontab(hour=9, minute=0), # 9:00 AM daily
},
'cleanup-old-sessions': {
'task': 'myapp.tasks.cleanup_sessions',
'schedule': crontab(hour=2, minute=0), # 2:00 AM daily
},
'check-subscriptions': {
'task': 'myapp.tasks.check_subscription_renewals',
'schedule': crontab(minute=0), # Every hour
},
}
Run migrations:
python manage.py migrate django_celery_beat
Task Chains
Execute tasks in sequence:
from celery import chain
# Task 2 receives result of Task 1
result = chain(
process_order.s(order_id),
send_confirmation.s(),
update_inventory.s(),
)()
Task Groups
Execute tasks in parallel:
from celery import group
# Process multiple emails concurrently
job = group(
send_email.s(user.email)
for user in User.objects.filter(active=True)
)
result = job.apply_async()
Monitoring with Flower
Install Flower:
pip install flower
Run Flower:
celery -A myproject flower
# Access at http://localhost:5555
Features:
- Real-time task monitoring
- Task history
- Worker statistics
- Task routing visualization
Production Deployment
Start Celery worker:
celery -A myproject worker -l info
Start Celery beat (scheduler):
celery -A myproject beat -l info
Supervisor configuration:
[program:myproject-celery]
command=/path/to/venv/bin/celery -A myproject worker -l info
directory=/path/to/project
user=www-data
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs=600
[program:myproject-celerybeat]
command=/path/to/venv/bin/celery -A myproject beat -l info
directory=/path/to/project
user=www-data
stdout_logfile=/var/log/celery/beat.log
stderr_logfile=/var/log/celery/beat.log
autostart=true
autorestart=true
Best Practices
1. Keep tasks idempotent:
@shared_task
def process_order(order_id):
order = Order.objects.get(id=order_id)
# Check if already processed
if order.status == 'processed':
return 'Already processed'
# Process order...
order.status = 'processed'
order.save()
2. Handle exceptions:
@shared_task
def risky_operation():
try:
dangerous_api_call()
except Exception as e:
logger.error(f'Task failed: {e}')
raise # Re-raise for Celery to track
3. Set task timeouts:
@shared_task(time_limit=300, soft_time_limit=250)
def long_running_task():
# Will be killed after 300 seconds
# SoftTimeLimitExceeded raised at 250 seconds
pass
Conclusion
Celery transforms Django applications from synchronous to asynchronous. Background processing, scheduling, and retry logic become straightforward.
Building Django applications with complex background tasks? Contact ZIRA Software for Python/Django expertise.