Files de tâches et tâches récurrentes avec Celery

Files de tâches et tâches récurrentes avec Celery

Quand on a à traiter des choses bloquantes, avec des dépendances, des flux complexes ou des actions répétitives, créer des files d’attente peut se révéler très judicieux.

Par exemple lancer la génération d’un gros zip sur le clic d’un utilisateur, télécharger plein fichiers en parallèle pour son site de cul, lancer des calculs sur plusieurs machines et récupérer le résultat, encoder des videos en arrière plan, etc.

Le problème, c’est que fabriquer des files d’attente à la main, ça mène généralement à une grosse galère. La première boîte dans laquelle j’ai travaillé avait tout un système de queues à base de PHP + SQL fait à la main qui tapait dans du MySQL, c’était pas marrant du tout

Je fais une pause, et je note que le potentiel de jeux de mots sur cet article est fortement élevé. Mais je resterai fort.

Locking, priorité, dépendance, asynchronicité, concurrence, sérialisation, encoding, stockage, accessibilité, load balancing… Toutes ces problématiques sont bien vicieuses et chronophages. Il vaut mieux utiliser une lib solide et éprouvée.

Je resterai fort.

Kombu est une telle lib, mais elle est lourde et complexe à utiliser. J’avais fais le choix de la prendre pour un gros projet avec Max, je le regrette sur le long terme : c’est dur à maintenir et à faire évoluer. Le code est vraiment pas sympa.

Heureusement, il existe une bibliothèque qui se met au-dessus de Kombu pour et nous expose juste les fonctionnalités que l’on souhaite : celery.

Fort.

Celery est simple pour démarrer, mais très puissant si on rentre dans le détail, et croyez moi, le détail, on peut y rentrer très très profondément.

F…

Installation

Qui dit file d’attente, dit stockage. Il faut bien mettre les tâches quelque part et communiquer avec ce quelque part. En base de données ? Dans un gestionnaire de messages ? En mémoire ? Dans un cache ?

Celery résout le problème en proposant la même interface, quelque soit le support. Actuellement, on peut utiliser :

  • RabbitMQ
  • Redis
  • MongoDB
  • Beanstalk CouchDB
  • SQLAlchemy ou l’ORM Django (et donc toutes les bases de données supportées comme Sqlite, MySQL, PostGres…)
  • Amazon SQS
Découvrir aussi  Quels sont les logiciel permettant de créer un diaporama de présentation ?

Dans notre exemple, nous allons le faire avec Redis car :

  • Redis est très simple à installer et à configurer.
  • Nous on utilise déjà du Redis partout.
  • Aucun risque de locking.

Pour ceux qui ont pas redis, c’est généralement dans les dépôts. Par exemple sur Ubuntu :

sudo apt-get install redis-server

Il n’y a rien à faire de plus, ça tourne, c’est configuré avec des valeurs par défaut qui sont saines. Je vous l’ai dis, redis, c’est fantastiquement bien foutu.

Ensuite on install celery et la lib d’accès à redis en Python qui porte un nom très original :

pip install celery redis

Ca devrait compiler un peu, et comme hab avec les extensions en C, assurez vous d’avoir un compilateur et les headers en place comme indiqué dans l’article sur pip.

Ensuite on peut créer ses tâches. Créez un module, par exemple tasks.py :

import urllib2   from collections import Counter   from celery import Celery   # Configuration de celery. Ceci peut aussi se faire dans un fichier de config. # Ici on dit à celery que pour le module ‘tasks’, on va utiliser redis # comme broker (passeur de massage) et comme result backend (stockage du # resultat des tâches). celery = Celery(‘tasks’, broker=’redis://localhost’, backend=’redis://localhost’)     # Et voici notre première tâche. C’est une fonction Python normale, décorée # avec un decorateur de celery. Elle prend une URL, et calcule le nombre # de lettre « e » qu’il y a dans la page. @celery.task def ecount(url): return Counter(urllib2.urlopen(url).read())[‘e’]

On lance ensuite le processus celery dans un terminal (en production, mettez ça dans supervisord ou systemd pour que ça démarre automatiquement) :

[test] sam ~/Bureau/celery_test $ celery -A tasks worker -B --loglevel=info

 -------------- celery@sam v3.0.21 (Chiastic Slide)
---- **** -----
--- * ***  * -- Linux-3.2.0-48-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> broker:      redis://localhost:6379//
- ** ---------- .> app:         tasks:0x2a2fa50
- ** ---------- .> concurrency: 4 (processes)
- *** --- * --- .> events:      OFF (enable -E to monitor this worker)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery:      exchange:celery(direct) binding:celery


[Tasks]
  . tasks.ecount

[2013-07-26 13:22:21,631: INFO/Beat] Celerybeat: Starting..
[2013-07-26 13:04:51,274: WARNING/MainProcess] celery@sam ready.
[2013-07-26 13:04:51,280: INFO/MainProcess] consumer: Connected to redis://localhost:6379//.

-A précise le module à importer, -B démarre le beat (on verra ça plus tard), worker dit à celery que démarrer des processus de consommation de files d’attente (par défaut 4 qui travaillent en parallèle), et --loglevel=info va nous permettre d’avoir un affichage verbeux pour comprendre ce qui se passe.

Découvrir aussi  Pourquoi créer un site e-commerce ?

Votre file d’attente est prête, et frétille d’impatience.

Lancer une tâche

A partir de là, vous pouvez envoyer des tâches dans la file d’attente, depuis n’importe où :

  • Un script.
  • Un serveur Web (par exemple une vue Django).
  • Un programme sur un autre serveur (même si il faudrait alors configurer redis pour qu’il écoute sur les ports extérieurs, ce qui n’est pas le cas ici par simplicité).
  • etc

Plusieurs programmes peuvent envoyer plein de tâches, en même temps, et elles vont se loger dans la file d’attente, sans bloquer le programme qui les a envoyé.

Par exemple, depuis le shell :

>>> from tasks import ecount >>> res = ecount.delay(‘http://danstonchat.com’)

Ceci ne bloque pas mon shell, la ligne s’exécute immédiatement. La fonction ecount n’est pas appelée depuis le shell, elle est dans la file d’attente et sera appelée par un des processus (les fameux ‘worker’) qui consomment la queue. Du côté de la file, on peut voir dans le log :

[2013-07-26 14:18:08,609: INFO/MainProcess] Got task from broker: tasks.ecount[599a52ea-ef6b-4499-981d-cd17fab592df] [2013-07-26 14:18:09,070: INFO/MainProcess] Task tasks.ecount[599a52ea-ef6b-4499-981d-cd17fab592df] succeeded in 0.446974039078s: 1242

On a donc notre tâche qui a bien été traitée.

On peut récupérer le résultat dans le shell :

>>> res.state ‘PENDING’

Ah… La tâche n’est pas encore terminée. Et un peu plus tard :

>>> res.state ‘SUCCESS’ >>> res.result 1242

Lancer une tâche est bien entendu peu intéressant, les listes d’attente sont vraiment sympa quand on a plein de tâches à lancer, par plein de processus différents :

results = [ecount.delay(url) for url in (‘http://google.com’, ‘https://sametmax.com’, ‘http://sebsauvage.com’, ‘http://multiboards.com’, ‘http://0bin.net’, ‘http://danstonchat.com’)]
[2013-07-26 14:25:46,646: INFO/MainProcess] Got task from broker: tasks.ecount[5d072a7b-29f8-4ea6-8d92-6a4c1740d724] [2013-07-26 14:25:46,649: INFO/MainProcess] Got task from broker: tasks.ecount[402f6a4f-6b35-4f62-a786-9a5ba27707d2] [2013-07-26 14:25:46,650: INFO/MainProcess] Got task from broker: tasks.ecount[bbe46b1b-4719-4c42-bd2f-21e4d72e613e] [2013-07-26 14:25:46,652: INFO/MainProcess] Got task from broker: tasks.ecount[8fb35186-66e2-4eae-a40c-fc42e500ab9d] [2013-07-26 14:25:46,653: INFO/MainProcess] Got task from broker: tasks.ecount[fc63f5db-8ade-4383-b719-c3d6390ca246] [2013-07-26 14:25:46,654: INFO/MainProcess] Got task from broker: tasks.ecount[8434e21d-79ea-4559-a90e-92e2bc2b9dc7] [2013-07-26 14:25:47,144: INFO/MainProcess] Task tasks.ecount[bbe46b1b-4719-4c42-bd2f-21e4d72e613e] succeeded in 0.479865789413s: 27 [2013-07-26 14:25:47,242: INFO/MainProcess] Task tasks.ecount[5d072a7b-29f8-4ea6-8d92-6a4c1740d724] succeeded in 0.578661203384s: 609 [2013-07-26 14:25:47,501: INFO/MainProcess] Task tasks.ecount[fc63f5db-8ade-4383-b719-c3d6390ca246] succeeded in 0.35736989975s: 263 [2013-07-26 14:25:47,645: INFO/MainProcess] Task tasks.ecount[8434e21d-79ea-4559-a90e-92e2bc2b9dc7] succeeded in 0.403187036514s: 1270 [2013-07-26 14:25:47,815: INFO/MainProcess] Task tasks.ecount[8fb35186-66e2-4eae-a40c-fc42e500ab9d] succeeded in 1.14100408554s: 23 [2013-07-26 14:25:49,010: INFO/MainProcess] Task tasks.ecount[402f6a4f-6b35-4f62-a786-9a5ba27707d2] succeeded in 2.34633708s: 3158

Car du coup on sait que ces multiples tâches ne vont pas bloquer le processus en cour, mais qu’en plus la charge sera répartie sur le nombre de workers qu’on a décidé au départ, ni plus (surcharge du serveur), ni moins (traitement trop lent).

Découvrir aussi  Le Pep8 en résumé, c'est quoi ?

Comment je sais quand une tâche est terminée ?

On peut attendre que la tâche soit terminée :

>>> print res.wait() 9999

Mais ce n’est pas vraiment le but. On cherche avant tout à ce que les tâches soient non bloquantes, et exécutées dans un processus à part voir potentiellement distribuées sur plusieurs serveurs.

Par ailleurs, Celery n’est pas un remplacement d’un système de traitement asynchrone comme Tornado ou NodeJS, il n’est pas fait pour envoyer des réponses asynchrones à l’utilisateur. Il est fait pour faire des tâches en background, répartir la charge et ordonner le traitement. Bien entendu, on peut faire communiquer un système asynchrone avec celery comme ici ou ici, mais c’est une autre histoire.

Concentrons nous sur les tâches.

La question de “Comment je sais quand une tâche est terminée ?” est souvent traduisible par “comment je réagis à une tâche pour lancer du code quand elle s’est terminée sans erreur ?”.

Et là, il y une solution toute simple :

res = tache1.s(arg1, arg2) | tache2.s() | tache3.s(arg1)

Ceci va créer une chaîne de tâches. Quand la première se termine, la deuxième se lance en recevant le résultat de la première en argument.

s() fabrique une sous-tâche, c’est à dire une tâche à envoyer dans la file plus tard avec des arguments pré-enregistrés. Dans notre exemple, celery va lancer tache1 avec deux arguments, puis si ça marche, va appeler tache2 en lui passant le résultat de tache1 comme argument, puis si ça marche, va appeler tache3 avec le résultat de tache2 en premier argument et arg1 en second argument.

En fait, celery vient avec tout un tas d’outils pour exécuter des tâches dépendantes les unes des autres : par groupes, par chaînes, par morceaux, etc. Mais de toute façon, vous pouvez appeler une tâche… à l’intérieur d’une autre tâche. Donc parti de là vous pouvez faire pas mal de choses.

Comment je fais pour faire une tâche récurrente ?

C’est là qu’intervient le “beat” dont j’ai parlé tout à l’heure. Avec cette option, celery va vérifier toutes les secondes si il n’y a pas une tâche répétitive à lancer, et la mettre dans une file d’attente, à la manière d’un cron.

Il suffit de définir une tâche comme periodic_task pour qu’elle soit lancée régulièrement.

import smtplib   from celery.schedules import crontab from celery.decorators import periodic_task   # va executer la tâche à 5h30, 13h30 et 23h30 tous les lundi # run_every accepte aussi un timedelta, pour par exemple dire « toutes les 10m » @periodic_task(run_every=crontab(hour=’5,13,23′, minute=30, day_of_week=’monday’)) def is_alive(): «  » » Vérifie que le blog est toujours en ligne, et si ce n’est pas le cas, envoie un mail en panique. «  » » if urllib2.urlopen(‘https://sametmax.com’).code != 200: mail = ‘lesametlemax__AT__gmail.com’.replace(‘__AT__’, ‘@’) server = smtplib.SMTP(‘smtp.gmail.com:587’) server.starttls() server.login(‘root’, ‘admin123’) server.sendmail(mail, mail, msg) server.quit()

Il y a de bons exemples sur la syntaxe sur crontab() dans la doc.

D’une manière générale, la doc de Celery est très très riche, donc plongez vous dedans si cet article ne répond pas à vos besoins, car si ça peut être mis dans une file, ça peut être fait par Celery.

Note de fin

Celery n’autoreload pas le code, donc redémarrez les workers à chaque fois que vous modifiez vos tasks.

Attention aussi aux tâches récurrentes, la suivante peut se lancer avant que la précédente soit terminée. C’est à vous de faire des tâches idempotentes, ou alors de mettre en place un système de locking.

4.1/5 - (28 votes)
John Loerse

A propos de l'auteur

John Loerse est un spécialiste du marketing qui a une passion pour la rédaction. Il a travaillé dans plusieurs entreprises au fil des ans, acquérant une grande expérience dans le domaine du marketing et de la communication. Grâce à sa solide expertise en rédaction et en communication, John a réussi à aider de nombreuses entreprises à atteindre leurs objectifs de marketing et à se développer de manière significative. Finalement, John a décidé de rejoindre Sametmax, une entreprise de mise en relation de freelance et entreprises, pour poursuivre sa carrière dans le marketing et la rédaction. Chez Sametmax, John travaille avec une équipe talentueuse de professionnels pour aider les entreprises à trouver les freelances les plus qualifiés et à réaliser des projets de qualité supérieure.

Laisser un commentaire