Skip to content

Instantly share code, notes, and snippets.

@adamcharnock
Last active July 1, 2017 15:41
Show Gist options
  • Save adamcharnock/ea5d467a5528e03d546734d564f4cc70 to your computer and use it in GitHub Desktop.
Save adamcharnock/ea5d467a5528e03d546734d564f4cc70 to your computer and use it in GitHub Desktop.

Checklist

  • Celery report:
$ celery -A celery_tasks report

software -> celery:4.0.2 (latentcall) kombu:4.0.2 py:3.6.1
            billiard:3.5.0.2 py-amqp:2.1.4
platform -> system:Darwin arch:64bit imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:pyamqp results:redis://localhost/

broker_url: 'amqp://guest:********@localhost:5672//'
result_backend: 'redis://localhost/'
  • Running against master branch

Steps to reproduce

# Execute 10 tasks then wait for results
$ python consumer_parallel.py 10
Time per call: 18.59ms

# Execute 10 tasks, each time waiting for the result before continuing
$ python consumer_serial.py 10
Time per call: 1492.49ms

(Scripts below)

Expected behavior

A sensible latency between dispatching a single task and receiving a result

Actual behavior

There is a 100x per-execution performance penalty when calling tasks serially rather than in parallel.

Motivation

My motivation is to determine (and minimise) the per-execution latency when waiting for the result of a task.

When executing tasks in parallel this seems to be fast. However, performance seems to drastically suffer when executing a single task and waiting for a result.

Scripts

# celery_tasks.py
from celery import Celery
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://guest@localhost//')

@app.task
def test_task():
    return 1
# consumer_parallel.py
import sys
import timeit
from celery_tasks import test_task

TOTAL_MESSAGES = int(sys.argv[1])

def do_it():
    results = []
    for x in range(0, int(TOTAL_MESSAGES)):
        results.append(test_task.delay())
    for result in results:
        result.get()


seconds = timeit.timeit(do_it, number=1) / TOTAL_MESSAGES
print('Time per call: {}ms'.format(round(seconds * 1000, 2)))
# consumer_serial.py
import sys
import timeit

from celery_tasks import test_task

TOTAL_MESSAGES = int(sys.argv[1])

def do_it():
    for x in range(0, TOTAL_MESSAGES):
        result = test_task.delay()
        result.get()

seconds = timeit.timeit(do_it, number=1) / TOTAL_MESSAGES
print('Time per call: {}ms'.format(round(seconds * 1000, 2)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment