Code Snippets for Implementing Asynchronous Tasks with Celery in Python
How can I implement asynchronous tasks using Celery in Python? Provide some code snippets to illustrate the process.
Celery is a powerful asynchronous task queue that allows you to execute tasks outside the main application flow. This is particularly useful for time-consuming or resource-intensive operations, preventing them from blocking the user interface or main process.
First, you need to install Celery and a message broker (like Redis or RabbitMQ). Using pip, install Celery:
pip install celery
For Redis, install:
pip install redis
Create a celery.py file in your project:
from celery import Celery
app = Celery('my_tasks',
broker='redis://localhost:6379/0', # Replace with your broker URL
backend='redis://localhost:6379/0') # Replace with your backend URL
app.conf.update(
result_expires=3600, # Task result expiration time (1 hour)
)
if __name__ == '__main__':
app.start()
Create a tasks.py file to define your Celery tasks:
from celery import Celery
app = Celery('my_tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
@app.task
def long_running_task():
# Simulate a long-running task
import time
time.sleep(10)
return 'Task completed!'
Open a new terminal and start the Celery worker:
celery -A tasks worker --loglevel=INFO
In your Python code, import the tasks and call them asynchronously using .delay():
from tasks import add, long_running_task
result = add.delay(4, 5)
print(f'Task ID: {result.id}')
long_task_result = long_running_task.delay()
print(f'Long Task ID: {long_task_result.id}')
You can check the status and retrieve results using the task ID:
from celery.result import AsyncResult
task_id = 'your_task_id_here'
result = AsyncResult(task_id)
if result.ready():
print(f'Task result: {result.get()}')
else:
print('Task is still processing...')
Know the answer? Login to help.
Login to Answer