Bagaimana cara membuat Seledri mengirim tugas ke pekerja yang tepat saat menggunakan send_task?

Misalnya, diberikan layanan berikut: service_add.py

from celery import Celery
    
    celery = Celery('service_add', backend='redis://localhost', broker='pyamqp://') 
    
    @celery.task
    def add(x,y):
       return x+y

Service_sub.py

from celery import Celery,shared_task

celery = Celery('service_sub', backend='redis://localhost', broker='pyamqp://') #redis backend,rabbitmq for messaging

@celery.task
def sub(x,y):
    return x-y

Kode berikut dijamin gagal: main.py

from celery.execute import send_task
result1 = send_task('service_sub.sub',(1,1)).get()
result2 = send_task('service_sub.sub',(1,1)).get()

Dengan pengecualian celery.exceptions.NotRegistered: 'service_sub.sub' karena Celery mengirimkan setiap proses tugas secara round-robin, meskipun service_sub hanya dimiliki oleh satu proses.

Agar pertanyaan selesai, inilah cara saya menjalankan proses dan file konfigurasi:

celery -A  service_sub worker --loglevel=INFO --pool=solo -n worker1
celery -A  service_add worker --loglevel=INFO --pool=solo -n worker2

Celeryconfig.py

## Broker settings.
broker_url = 'pyamqp://'

# List of modules to import when the Celery worker starts.
imports = ('service_add.py','service_sub.py')
0
yoni keren 2 Mei 2021, 01:16

1 menjawab

Jawaban Terbaik

Jika Anda menggunakan dua aplikasi berbeda service_add / service_sub hanya untuk mencapai perutean tugas ke pekerja khusus, saya ingin menyarankan solusi lain. Jika bukan itu masalahnya dan Anda masih membutuhkan dua (atau lebih aplikasi) saya sarankan enkapsulasi broker dengan lebih baik seperti amqp://localhost:5672/add_vhost & backend: redis://localhost/1. Memiliki vhost khusus di rabbitMQ dan id database khusus (1) di Redis mungkin berhasil.

Karena itu, saya pikir solusi yang tepat dalam kasus seperti itu adalah menggunakan aplikasi seledri yang sama (tidak membelah menjadi dua aplikasi) dan menggunakan router:

task_routes = {'tasks.service_add': {'queue': 'add'}, 'tasks.service_sub': {'queue': 'sub'}}

Tambahkan ke konfigurasi:

app.conf.task_routes = task_routes

Dan jalankan pekerja Anda dengan Q (dari antrian mana untuk membaca pesan):

celery -A  shared_app worker --loglevel=INFO --pool=solo -n worker1 -Q add
celery -A  shared_app worker --loglevel=INFO --pool=solo -n worker2 -Q sub

Perhatikan bahwa pendekatan ini memiliki lebih banyak manfaat, seperti jika Anda ingin memiliki beberapa ketergantungan antar tugas (kanvas).

Ada lebih banyak cara untuk mendefinisikan router, Anda dapat membaca lebih lanjut tentangnya di sini.

1
ItayB 2 Mei 2021, 05:43