Katakanlah saya sedang membuat aplikasi obrolan sederhana menggunakan gRPC dengan .proto berikut:

service Chat {
    rpc SendChat(Message) returns (Void);
    rpc SubscribeToChats(Void) returns (stream Message);
}

message Void {}
message Message {string text = 1;}

Cara saya sering melihat servicer diimplementasikan (dalam contoh) dengan Python adalah seperti ini:

class Servicer(ChatServicer):
    def __init__(self):
        self.messages = []

    def SendChat(self, request, context):
        self.messages.append(request.text)
        return Void()

    def SubscribeToChats(self, request, context):
        while True:
            if len(self.messages) > 0:
                yield Message(text=self.messages.pop())    

Meskipun ini berhasil, tampaknya sangat tidak efisien untuk menelurkan loop tak terbatas yang terus-menerus memeriksa kondisi untuk setiap klien yang terhubung. Akan lebih baik daripada memiliki sesuatu seperti ini, di mana pengiriman dipicu tepat saat pesan masuk dan tidak memerlukan polling konstan dengan syarat:

class Servicer(ChatServicer):
    def __init__(self):
        self.listeners = []

    def SendChat(self, request, context):
        for listener in self.listeners:
            listener(Message(request.text))
        return Void()

    def SubscribeToChats(self, request, context, callback):
        self.listeners.append(callback)    

Namun, sepertinya saya tidak dapat menemukan cara untuk melakukan sesuatu seperti ini menggunakan gRPC.

Saya memiliki pertanyaan berikut:

  1. Apakah saya benar bahwa infinite loop tidak efisien untuk kasus seperti ini? Atau apakah ada pengoptimalan yang terjadi di latar belakang yang tidak saya sadari?
  2. Apakah ada cara yang efisien untuk mencapai sesuatu yang mirip dengan solusi pilihan saya di atas? Sepertinya kasus penggunaan yang cukup umum, jadi saya yakin ada sesuatu yang saya lewatkan.

Terima kasih sebelumnya!

0
Joshua F 10 Mei 2021, 09:20

1 menjawab

Jawaban Terbaik

Saya menemukan cara yang efisien untuk melakukannya. Kuncinya adalah menggunakan API AsyncIO. Sekarang fungsi SubscribeToChats saya bisa menjadi generator asinkron, yang membuat segalanya lebih mudah.

Sekarang saya dapat menggunakan sesuatu seperti Antrian asyncio, yang fungsi saya dapat menunggu dalam loop sementara. Mirip dengan ini:

class Servicer(ChatServicer):
    def __init__(self):
        self.queue = asyncio.Queue()

    async def SendChat(self, request, context):
        await self.queue.put(request.text)
        return Void()

    async def SubscribeToChats(self, request, context):
        while True:
            yield Message(text=await self.queue.get())
0
Joshua F 11 Mei 2021, 05:55