Quand on a à traiter des choses bloquantes, avec des dépendances, des flux complexes ou des actions répétitives, créer des files d’attente peut se révéler très judicieux.
Par exemple lancer la génération d’un gros zip sur le clic d’un utilisateur, télécharger plein fichiers en parallèle pour son site de cul, lancer des calculs sur plusieurs machines et récupérer le résultat, encoder des videos en arrière plan, etc.
Le problème, c’est que fabriquer des files d’attente à la main, ça mène généralement à une grosse galère. La première boîte dans laquelle j’ai travaillé avait tout un système de queues à base de PHP + SQL fait à la main qui tapait dans du MySQL, c’était pas marrant du tout
Je fais une pause, et je note que le potentiel de jeux de mots sur cet article est fortement élevé. Mais je resterai fort.
Locking, priorité, dépendance, asynchronicité, concurrence, sérialisation, encoding, stockage, accessibilité, load balancing… Toutes ces problématiques sont bien vicieuses et chronophages. Il vaut mieux utiliser une lib solide et éprouvée.
Je resterai fort.
Kombu est une telle lib, mais elle est lourde et complexe à utiliser. J’avais fais le choix de la prendre pour un gros projet avec Max, je le regrette sur le long terme : c’est dur à maintenir et à faire évoluer. Le code est vraiment pas sympa.
Heureusement, il existe une bibliothèque qui se met au-dessus de Kombu pour et nous expose juste les fonctionnalités que l’on souhaite : celery.
Fort.
Celery est simple pour démarrer, mais très puissant si on rentre dans le détail, et croyez moi, le détail, on peut y rentrer très très profondément.
F…
Installation
Qui dit file d’attente, dit stockage. Il faut bien mettre les tâches quelque part et communiquer avec ce quelque part. En base de données ? Dans un gestionnaire de messages ? En mémoire ? Dans un cache ?
Celery résout le problème en proposant la même interface, quelque soit le support. Actuellement, on peut utiliser :
- RabbitMQ
- Redis
- MongoDB
- Beanstalk CouchDB
- SQLAlchemy ou l’ORM Django (et donc toutes les bases de données supportées comme Sqlite, MySQL, PostGres…)
- Amazon SQS
Dans notre exemple, nous allons le faire avec Redis car :
- Redis est très simple à installer et à configurer.
- Nous on utilise déjà du Redis partout.
- Aucun risque de locking.
Pour ceux qui ont pas redis, c’est généralement dans les dépôts. Par exemple sur Ubuntu :
sudo apt-get install redis-server |
Il n’y a rien à faire de plus, ça tourne, c’est configuré avec des valeurs par défaut qui sont saines. Je vous l’ai dis, redis, c’est fantastiquement bien foutu.
Ensuite on install celery et la lib d’accès à redis en Python qui porte un nom très original :
pip install celery redis |
Ca devrait compiler un peu, et comme hab avec les extensions en C, assurez vous d’avoir un compilateur et les headers en place comme indiqué dans l’article sur pip.
Ensuite on peut créer ses tâches. Créez un module, par exemple tasks.py :
import urllib2 from collections import Counter from celery import Celery # Configuration de celery. Ceci peut aussi se faire dans un fichier de config. # Ici on dit à celery que pour le module ‘tasks’, on va utiliser redis # comme broker (passeur de massage) et comme result backend (stockage du # resultat des tâches). celery = Celery(‘tasks’, broker=’redis://localhost’, backend=’redis://localhost’) # Et voici notre première tâche. C’est une fonction Python normale, décorée # avec un decorateur de celery. Elle prend une URL, et calcule le nombre # de lettre « e » qu’il y a dans la page. @celery.task def ecount(url): return Counter(urllib2.urlopen(url).read())[‘e’] |
On lance ensuite le processus celery dans un terminal (en production, mettez ça dans supervisord ou systemd pour que ça démarre automatiquement) :
[test] sam ~/Bureau/celery_test $ celery -A tasks worker -B --loglevel=info -------------- celery@sam v3.0.21 (Chiastic Slide) ---- **** ----- --- * *** * -- Linux-3.2.0-48-generic-x86_64-with-Ubuntu-12.04-precise -- * - **** --- - ** ---------- [config] - ** ---------- .> broker: redis://localhost:6379// - ** ---------- .> app: tasks:0x2a2fa50 - ** ---------- .> concurrency: 4 (processes) - *** --- * --- .> events: OFF (enable -E to monitor this worker) -- ******* ---- --- ***** ----- [queues] -------------- .> celery: exchange:celery(direct) binding:celery [Tasks] . tasks.ecount [2013-07-26 13:22:21,631: INFO/Beat] Celerybeat: Starting.. [2013-07-26 13:04:51,274: WARNING/MainProcess] celery@sam ready. [2013-07-26 13:04:51,280: INFO/MainProcess] consumer: Connected to redis://localhost:6379//.
-A
précise le module à importer, -B
démarre le beat (on verra ça plus tard), worker
dit à celery que démarrer des processus de consommation de files d’attente (par défaut 4 qui travaillent en parallèle), et --loglevel=info
va nous permettre d’avoir un affichage verbeux pour comprendre ce qui se passe.
Votre file d’attente est prête, et frétille d’impatience.
Lancer une tâche
A partir de là, vous pouvez envoyer des tâches dans la file d’attente, depuis n’importe où :
- Un script.
- Un serveur Web (par exemple une vue Django).
- Un programme sur un autre serveur (même si il faudrait alors configurer redis pour qu’il écoute sur les ports extérieurs, ce qui n’est pas le cas ici par simplicité).
- etc
Plusieurs programmes peuvent envoyer plein de tâches, en même temps, et elles vont se loger dans la file d’attente, sans bloquer le programme qui les a envoyé.
Par exemple, depuis le shell :
>>> from tasks import ecount >>> res = ecount.delay(‘http://danstonchat.com’) |
Ceci ne bloque pas mon shell, la ligne s’exécute immédiatement. La fonction ecount
n’est pas appelée depuis le shell, elle est dans la file d’attente et sera appelée par un des processus (les fameux ‘worker’) qui consomment la queue. Du côté de la file, on peut voir dans le log :
[2013-07-26 14:18:08,609: INFO/MainProcess] Got task from broker: tasks.ecount[599a52ea-ef6b-4499-981d-cd17fab592df] [2013-07-26 14:18:09,070: INFO/MainProcess] Task tasks.ecount[599a52ea-ef6b-4499-981d-cd17fab592df] succeeded in 0.446974039078s: 1242 |
On a donc notre tâche qui a bien été traitée.
On peut récupérer le résultat dans le shell :
>>> res.state ‘PENDING’ |
Ah… La tâche n’est pas encore terminée. Et un peu plus tard :
>>> res.state ‘SUCCESS’ >>> res.result 1242 |
Lancer une tâche est bien entendu peu intéressant, les listes d’attente sont vraiment sympa quand on a plein de tâches à lancer, par plein de processus différents :
results = [ecount.delay(url) for url in (‘http://google.com’, ‘https://sametmax.com’, ‘http://sebsauvage.com’, ‘http://multiboards.com’, ‘http://0bin.net’, ‘http://danstonchat.com’)] |
[2013-07-26 14:25:46,646: INFO/MainProcess] Got task from broker: tasks.ecount[5d072a7b-29f8-4ea6-8d92-6a4c1740d724] [2013-07-26 14:25:46,649: INFO/MainProcess] Got task from broker: tasks.ecount[402f6a4f-6b35-4f62-a786-9a5ba27707d2] [2013-07-26 14:25:46,650: INFO/MainProcess] Got task from broker: tasks.ecount[bbe46b1b-4719-4c42-bd2f-21e4d72e613e] [2013-07-26 14:25:46,652: INFO/MainProcess] Got task from broker: tasks.ecount[8fb35186-66e2-4eae-a40c-fc42e500ab9d] [2013-07-26 14:25:46,653: INFO/MainProcess] Got task from broker: tasks.ecount[fc63f5db-8ade-4383-b719-c3d6390ca246] [2013-07-26 14:25:46,654: INFO/MainProcess] Got task from broker: tasks.ecount[8434e21d-79ea-4559-a90e-92e2bc2b9dc7] [2013-07-26 14:25:47,144: INFO/MainProcess] Task tasks.ecount[bbe46b1b-4719-4c42-bd2f-21e4d72e613e] succeeded in 0.479865789413s: 27 [2013-07-26 14:25:47,242: INFO/MainProcess] Task tasks.ecount[5d072a7b-29f8-4ea6-8d92-6a4c1740d724] succeeded in 0.578661203384s: 609 [2013-07-26 14:25:47,501: INFO/MainProcess] Task tasks.ecount[fc63f5db-8ade-4383-b719-c3d6390ca246] succeeded in 0.35736989975s: 263 [2013-07-26 14:25:47,645: INFO/MainProcess] Task tasks.ecount[8434e21d-79ea-4559-a90e-92e2bc2b9dc7] succeeded in 0.403187036514s: 1270 [2013-07-26 14:25:47,815: INFO/MainProcess] Task tasks.ecount[8fb35186-66e2-4eae-a40c-fc42e500ab9d] succeeded in 1.14100408554s: 23 [2013-07-26 14:25:49,010: INFO/MainProcess] Task tasks.ecount[402f6a4f-6b35-4f62-a786-9a5ba27707d2] succeeded in 2.34633708s: 3158 |
Car du coup on sait que ces multiples tâches ne vont pas bloquer le processus en cour, mais qu’en plus la charge sera répartie sur le nombre de workers qu’on a décidé au départ, ni plus (surcharge du serveur), ni moins (traitement trop lent).
Comment je sais quand une tâche est terminée ?
On peut attendre que la tâche soit terminée :
>>> print res.wait() 9999 |
Mais ce n’est pas vraiment le but. On cherche avant tout à ce que les tâches soient non bloquantes, et exécutées dans un processus à part voir potentiellement distribuées sur plusieurs serveurs.
Par ailleurs, Celery n’est pas un remplacement d’un système de traitement asynchrone comme Tornado ou NodeJS, il n’est pas fait pour envoyer des réponses asynchrones à l’utilisateur. Il est fait pour faire des tâches en background, répartir la charge et ordonner le traitement. Bien entendu, on peut faire communiquer un système asynchrone avec celery comme ici ou ici, mais c’est une autre histoire.
Concentrons nous sur les tâches.
La question de “Comment je sais quand une tâche est terminée ?” est souvent traduisible par “comment je réagis à une tâche pour lancer du code quand elle s’est terminée sans erreur ?”.
Et là, il y une solution toute simple :
res = tache1.s(arg1, arg2) | tache2.s() | tache3.s(arg1) |
Ceci va créer une chaîne de tâches. Quand la première se termine, la deuxième se lance en recevant le résultat de la première en argument.
s()
fabrique une sous-tâche, c’est à dire une tâche à envoyer dans la file plus tard avec des arguments pré-enregistrés. Dans notre exemple, celery va lancer tache1
avec deux arguments, puis si ça marche, va appeler tache2
en lui passant le résultat de tache1
comme argument, puis si ça marche, va appeler tache3
avec le résultat de tache2
en premier argument et arg1
en second argument.
En fait, celery vient avec tout un tas d’outils pour exécuter des tâches dépendantes les unes des autres : par groupes, par chaînes, par morceaux, etc. Mais de toute façon, vous pouvez appeler une tâche… à l’intérieur d’une autre tâche. Donc parti de là vous pouvez faire pas mal de choses.
Comment je fais pour faire une tâche récurrente ?
C’est là qu’intervient le “beat” dont j’ai parlé tout à l’heure. Avec cette option, celery va vérifier toutes les secondes si il n’y a pas une tâche répétitive à lancer, et la mettre dans une file d’attente, à la manière d’un cron.
Il suffit de définir une tâche comme periodic_task
pour qu’elle soit lancée régulièrement.
import smtplib from celery.schedules import crontab from celery.decorators import periodic_task # va executer la tâche à 5h30, 13h30 et 23h30 tous les lundi # run_every accepte aussi un timedelta, pour par exemple dire « toutes les 10m » @periodic_task(run_every=crontab(hour=’5,13,23′, minute=30, day_of_week=’monday’)) def is_alive(): « » » Vérifie que le blog est toujours en ligne, et si ce n’est pas le cas, envoie un mail en panique. « » » if urllib2.urlopen(‘https://sametmax.com’).code != 200: mail = ‘lesametlemax__AT__gmail.com’.replace(‘__AT__’, ‘@’) server = smtplib.SMTP(‘smtp.gmail.com:587’) server.starttls() server.login(‘root’, ‘admin123’) server.sendmail(mail, mail, msg) server.quit() |
Il y a de bons exemples sur la syntaxe sur crontab()
dans la doc.
D’une manière générale, la doc de Celery est très très riche, donc plongez vous dedans si cet article ne répond pas à vos besoins, car si ça peut être mis dans une file, ça peut être fait par Celery.
Note de fin
Celery n’autoreload pas le code, donc redémarrez les workers à chaque fois que vous modifiez vos tasks.
Attention aussi aux tâches récurrentes, la suivante peut se lancer avant que la précédente soit terminée. C’est à vous de faire des tâches idempotentes, ou alors de mettre en place un système de locking.