Playing with Chords – Celery

on

TL;DR

In this post, I will walk through how to use “chord” structure in Celery.

The Basics

Chords is a complex workflow that is present in Celery. It is a combination of two things –

  1. Groups – Groups are a way of combining several independent tasks so that they run in parallel. In a previous post, we looked at how Groups, Loops and Parallelism works.
  2. Callbacks – Callbacks are functions that can be executed after a particular function is executed.

The Code

Refer this gist.

Code Walkthrough

This part will cover only the specifics of the “chord” structure –

chord(run_ping.s(i, chord_service.__name__) for i in range(5))(chord_result.s())
The above line can be broken down into two pieces –

Chord Function

chord(run_ping.s(i, chord_service.__name__) for i in range(5))
In this bit, we are doing 3 steps –
  1. Call the run_ping function by sending it some parameters.
    • We also append a “.s” to the end of this function which is actually something called a signature. Consider a signature as good as running a “.delay” for the moment – we’ll deal with this on a later day.
    • Ignore the “chord_service.__name__” if you like because it just returns the string name of the calling function for printing in the results.
  2. Iterate 5 times over the run_ping function.
    • In our case, it is the easiest way to create a use-case of 5 independent tasks. Soon enough, we’ll look at the use cases for chords.
  3. Pass the list generated by the above two steps to our chord function.

Callback Function

(chord_result.s())

In this bit we’re doing two steps –

  1. Calling the function/object returned by chords
  2. Passing the result function to the earlier function

Let’s understand this a little better. Consider the example –

def foo():
    return bar

def bar(msg):
    print(msg)
 
foo()("Bello Merld")

The output of the above function would be –

Bello Merld

Note: This above explanation is just my understanding. At the time of writing this, I am not sure I understand the complexity of the implementation just yet but I will share it if I understand.

One Important Detail

There is one small bit that I have excluded so far which is very very critical to chords. The result of the chord function becomes an implicit argument to the callback function.

So in our example, the result at the end of each “run_ping” call, will be appended to a list and then implicitly sent to “chord_result.s()

Chords in Action

If you run the code above, you should see something like this. Notice that the “non_chord_service” does the exact same thing that “chord_service” does.

[2018-05-30 18:58:31,619: WARNING/ForkPoolWorker-5] Bello Merld at 2018-05-30T18:58:31.618400
[2018-05-30 18:58:36,035: WARNING/ForkPoolWorker-1] Item Number: 0, Function Name: non_chord_service, Total time: 3.28 sec
[2018-05-30 18:58:36,074: WARNING/ForkPoolWorker-2] chord_service ended, Total time: 3.84 sec
[2018-05-30 18:58:38,476: WARNING/ForkPoolWorker-6] Item Number: 0, Function Name: chord_service, Total time: 3.17 sec
[2018-05-30 18:58:39,202: WARNING/ForkPoolWorker-1] Item Number: 1, Function Name: non_chord_service, Total time: 3.17 sec
[2018-05-30 18:58:39,247: WARNING/ForkPoolWorker-5] Item Number: 1, Function Name: chord_service, Total time: 3.17 sec
[2018-05-30 18:58:39,757: WARNING/ForkPoolWorker-3] Item Number: 2, Function Name: chord_service, Total time: 3.17 sec
[2018-05-30 18:58:40,275: WARNING/ForkPoolWorker-7] Item Number: 3, Function Name: chord_service, Total time: 3.16 sec
[2018-05-30 18:58:40,811: WARNING/ForkPoolWorker-2] Item Number: 4, Function Name: chord_service, Total time: 3.17 sec
[2018-05-30 18:58:42,085: WARNING/ForkPoolWorker-6] ['PING adikrishnan.in (192.0.78.24): 56 data bytes\n64 bytes from 192.0.78.24: icmp_seq=0 ttl=53 time=149.953 ms\n64 bytes from 192.0.78.24: icmp_seq=1 ttl=53 time=150.405 ms\n64 bytes from 192.0.78.24: icmp_seq=2 ttl=53 time=181.933 ms\n64 bytes from 192.0.78.24: icmp_seq=3 ttl=53 time=151.034 ms\n\n--- adikrishnan.in ping statistics ---\n4 packets transmitted, 4 packets received, 0.0% packet loss\nround-trip min/avg/max/stddev = 149.953/158.331/181.933/13.632 ms\n', 'PING adikrishnan.in (192.0.78.24): 56 data bytes\n64 bytes from 192.0.78.24: icmp_seq=0 ttl=53 time=149.745 ms\n64 bytes from 192.0.78.24: icmp_seq=1 ttl=53 time=149.429 ms\n64 bytes from 192.0.78.24: icmp_seq=2 ttl=53 time=149.615 ms\n64 bytes from 192.0.78.24: icmp_seq=3 ttl=53 time=149.767 ms\n\n--- adikrishnan.in ping statistics ---\n4 packets transmitted, 4 packets received, 0.0% packet loss\nround-trip min/avg/max/stddev = 149.429/149.639/149.767/0.134 ms\n', 'PING adikrishnan.in (192.0.78.24): 56 data bytes\n64 bytes from 192.0.78.24: icmp_seq=0 ttl=53 time=149.507 ms\n64 bytes from 192.0.78.24: icmp_seq=1 ttl=53 time=149.846 ms\n64 bytes from 192.0.78.24: icmp_seq=2 ttl=53 time=149.528 ms\n64 bytes from 192.0.78.24: icmp_seq=3 ttl=53 time=149.665 ms\n\n--- adikrishnan.in ping statistics ---\n4 packets transmitted, 4 packets received, 0.0% packet loss\nround-trip min/avg/max/stddev = 149.507/149.636/149.846/0.135 ms\n', 'PING adikrishnan.in (192.0.78.24): 56 data bytes\n64 bytes from 192.0.78.24: icmp_seq=0 ttl=53 time=149.546 ms\n64 bytes from 192.0.78.24: icmp_seq=1 ttl=53 time=149.657 ms\n64 bytes from 192.0.78.24: icmp_seq=2 ttl=53 time=149.664 ms\n64 bytes from 192.0.78.24: icmp_seq=3 ttl=53 time=149.612 ms\n\n--- adikrishnan.in ping statistics ---\n4 packets transmitted, 4 packets received, 0.0% packet loss\nround-trip min/avg/max/stddev = 149.546/149.620/149.664/0.047 ms\n', 'PING adikrishnan.in (192.0.78.24): 56 data bytes\n64 bytes from 192.0.78.24: icmp_seq=0 ttl=53 time=149.544 ms\n64 bytes from 192.0.78.24: icmp_seq=1 ttl=53 time=149.607 ms\n64 bytes from 192.0.78.24: icmp_seq=2 ttl=53 time=150.352 ms\n64 bytes from 192.0.78.24: icmp_seq=3 ttl=53 time=149.552 ms\n\n--- adikrishnan.in ping statistics ---\n4 packets transmitted, 4 packets received, 0.0% packet loss\nround-trip min/avg/max/stddev = 149.544/149.764/150.352/0.340 ms\n']
[2018-05-30 18:58:42,370: WARNING/ForkPoolWorker-1] Item Number: 2, Function Name: non_chord_service, Total time: 3.17 sec
[2018-05-30 18:58:45,535: WARNING/ForkPoolWorker-1] Item Number: 3, Function Name: non_chord_service, Total time: 3.16 sec
[2018-05-30 18:58:48,700: WARNING/ForkPoolWorker-1] Item Number: 4, Function Name: non_chord_service, Total time: 3.16 sec
[2018-05-30 18:58:48,701: WARNING/ForkPoolWorker-1] ['PING adikrishnan.in (192.0.78.24): 56 data bytes\n64 bytes from 192.0.78.24: icmp_seq=0 ttl=53 time=149.427 ms\n64 bytes from 192.0.78.24: icmp_seq=1 ttl=53 time=149.454 ms\n64 bytes from 192.0.78.24: icmp_seq=2 ttl=53 time=149.609 ms\n64 bytes from 192.0.78.24: icmp_seq=3 ttl=53 time=149.826 ms\n\n--- adikrishnan.in ping statistics ---\n4 packets transmitted, 4 packets received, 0.0% packet loss\nround-trip min/avg/max/stddev = 149.427/149.579/149.826/0.159 ms\n', 'PING adikrishnan.in (192.0.78.24): 56 data bytes\n64 bytes from 192.0.78.24: icmp_seq=0 ttl=53 time=149.466 ms\n64 bytes from 192.0.78.24: icmp_seq=1 ttl=53 time=149.665 ms\n64 bytes from 192.0.78.24: icmp_seq=2 ttl=53 time=149.489 ms\n64 bytes from 192.0.78.24: icmp_seq=3 ttl=53 time=150.198 ms\n\n--- adikrishnan.in ping statistics ---\n4 packets transmitted, 4 packets received, 0.0% packet loss\nround-trip min/avg/max/stddev = 149.466/149.704/150.198/0.295 ms\n', 'PING adikrishnan.in (192.0.78.24): 56 data bytes\n64 bytes from 192.0.78.24: icmp_seq=0 ttl=53 time=149.562 ms\n64 bytes from 192.0.78.24: icmp_seq=1 ttl=53 time=150.213 ms\n64 bytes from 192.0.78.24: icmp_seq=2 ttl=53 time=149.681 ms\n64 bytes from 192.0.78.24: icmp_seq=3 ttl=53 time=149.646 ms\n\n--- adikrishnan.in ping statistics ---\n4 packets transmitted, 4 packets received, 0.0% packet loss\nround-trip min/avg/max/stddev = 149.562/149.775/150.213/0.256 ms\n', 'PING adikrishnan.in (192.0.78.24): 56 data bytes\n64 bytes from 192.0.78.24: icmp_seq=0 ttl=53 time=149.368 ms\n64 bytes from 192.0.78.24: icmp_seq=1 ttl=53 time=150.582 ms\n64 bytes from 192.0.78.24: icmp_seq=2 ttl=53 time=149.385 ms\n64 bytes from 192.0.78.24: icmp_seq=3 ttl=53 time=149.485 ms\n\n--- adikrishnan.in ping statistics ---\n4 packets transmitted, 4 packets received, 0.0% packet loss\nround-trip min/avg/max/stddev = 149.368/149.705/150.582/0.508 ms\n', 'PING adikrishnan.in (192.0.78.24): 56 data bytes\n64 bytes from 192.0.78.24: icmp_seq=0 ttl=53 time=149.941 ms\n64 bytes from 192.0.78.24: icmp_seq=1 ttl=53 time=149.936 ms\n64 bytes from 192.0.78.24: icmp_seq=2 ttl=53 time=214.971 ms\n64 bytes from 192.0.78.24: icmp_seq=3 ttl=53 time=149.582 ms\n\n--- adikrishnan.in ping statistics ---\n4 packets transmitted, 4 packets received, 0.0% packet loss\nround-trip min/avg/max/stddev = 149.582/166.107/214.971/28.212 ms\n']
[2018-05-30 18:58:48,701: WARNING/ForkPoolWorker-1] non_chord_service ended, Total time: 15.95 sec

Let’s analyze this –

  1. The function “chord_service” actually completes before any of the “run_ping” functions it called completed execution. This shows that chords is actually asynchronous.
  2. The workers that run “chord_service” and their corresponding “run_ping” are actually different each time and random. This shows that chords is actually running things in parallel.
  3. The workers that run “non_chord_service” are just – “Worker 1” and the function completes execution only after all it’s corresponding “run_ping” functions complete. This shows that a for loop based implementation can be more time consuming for your application and you should totally avoid this.
  4. If you look at the result printed, it is a list of 5 values in both cases which will be exactly the same except for statistical parameters and ttl. [This goes back to prove that the results of chords were implicitly sent as a parameter to the callback function]

Using Chords

Some examples could be –

  1. Sending emails to a set of people and updating the database that the emails were successfully sent. You can return successful/not successful as a list and then perform a single update operation on the database – much faster than a traditional loop.
  2. If your application talks to multiple microservices and they are not interlinked, chords can perform the task for you and then the callback can help you proceed further.

Pitfalls and Additional Notes

If you are still reading this, I am humbled. I have made a lot of mistakes and also found interesting ways of doing things with chords. I am putting it out here so you too can benefit from it.

  1. Having a “result_backend” is a must for implementing chords. However, it can get very messy. Do take a look at my other blog post about why storing results in celery is a bad idea.
  2. Instead of creating a list comprehension like the one in this example, you could directly pass a list to a chord. This list can come from a different function or even an API that you have exposed using your web application.
  3. Typically, we forget to add “.s” to all the functions that we want to run chords on and even the callback function. That can make one go around in circles (and the errors are not too intuitive). Chords use signatures and hence it expects signatures.

Hope this blog post helps!

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s