Code Snippets for Implementing Asynchronous Tasks with Celery in Python

How can I implement asynchronous tasks using Celery in Python? Provide some code snippets to illustrate the process.

1 Answers

✓ Best Answer

Asynchronous Tasks with Celery in Python 🚀

Celery is a powerful asynchronous task queue that allows you to execute tasks outside the main application flow. This is particularly useful for time-consuming or resource-intensive operations, preventing them from blocking the user interface or main process.

Setting Up Celery 🛠️

First, you need to install Celery and a message broker (like Redis or RabbitMQ). Using pip, install Celery:

pip install celery

For Redis, install:

pip install redis

Basic Celery Configuration ⚙️

Create a celery.py file in your project:


from celery import Celery

app = Celery('my_tasks', 
             broker='redis://localhost:6379/0',  # Replace with your broker URL
             backend='redis://localhost:6379/0') # Replace with your backend URL

app.conf.update(
    result_expires=3600, # Task result expiration time (1 hour)
)

if __name__ == '__main__':
    app.start()

Defining Celery Tasks 📝

Create a tasks.py file to define your Celery tasks:


from celery import Celery

app = Celery('my_tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@app.task
def long_running_task():
    # Simulate a long-running task
    import time
    time.sleep(10)
    return 'Task completed!'

Running Celery Worker 🏃

Open a new terminal and start the Celery worker:

celery -A tasks worker --loglevel=INFO

Calling Tasks 📞

In your Python code, import the tasks and call them asynchronously using .delay():


from tasks import add, long_running_task

result = add.delay(4, 5)
print(f'Task ID: {result.id}')

long_task_result = long_running_task.delay()
print(f'Long Task ID: {long_task_result.id}')

Checking Task Status and Results ✅

You can check the status and retrieve results using the task ID:


from celery.result import AsyncResult

task_id = 'your_task_id_here'
result = AsyncResult(task_id)

if result.ready():
    print(f'Task result: {result.get()}')
else:
    print('Task is still processing...')

Example Usage 💡

  • Image Processing: Resize or convert images in the background.
  • Sending Emails: Send welcome emails or notifications asynchronously.
  • Data Processing: Process large datasets without blocking the main application.

Know the answer? Login to help.