Saya ingin menerapkan server streaming yang mengirim dan aliran data tanpa akhir ke semua klien yang terhubung. Beberapa klien harus dapat terhubung dan memutuskan sambungan dari server untuk memproses data dengan cara yang berbeda.

Setiap klien dilayani oleh ClientThread khusus, yang mensub-kelaskan Thread dan berisi antrian data yang akan dikirim ke klien (diperlukan, karena klien mungkin memproses data pada kecepatan yang berbeda dan karena semburan data dapat terjadi di mana klien mungkin tidak mampu menangani).

Program mendengarkan koneksi klien yang masuk melalui ClientHandlerThread yang terpisah. Setiap kali klien terhubung, ClientHandlerThread memunculkan ClientThread dan menambahkannya ke daftar.

Sebagai contoh dummy, Thread utama menambah bilangan bulat setiap detik dan mendorongnya ke semua antrian ClientThread melalui ClientHandlerThread.push_item().

Setiap 10 kenaikan, jumlah item dalam antrian klien dicetak.


Sekarang untuk pertanyaan saya:

Ketika klien terputus, Thread berakhir dan tidak ada lagi data yang dikirim, namun, objek ClientThread tetap berada dalam daftar klien ClientHandlerThreads dan item terus didorong ke antriannya.

Karena itu saya mencari (1) cara untuk menghapus objek ClientThread dari daftar setiap kali klien terputus, (2) cara yang lebih baik untuk memantau ClientThreads daripada daftar atau (3) arsitektur berbeda (lebih baik) untuk diarsipkan tujuanku.

Terimakasih banyak!


Server

import socket
import time

from threading import Thread
from queue import Queue


class ClientThread(Thread):

    def __init__(self, conn, client_addr):
        Thread.__init__(self)

        self.queue = Queue()

        self.conn = conn
        self.client_addr = client_addr

    def run(self):
        print('Client connected')

        while True:
            try:
                self.conn.sendall(self.queue.get().encode('utf-8'))
                time.sleep(1)
            except BrokenPipeError:
                print('Client disconnected')
                break


class ClientHandlerThread(Thread):

    def __init__(self):
        Thread.__init__(self, daemon = True)

        self.clients = list()

    def push_item(self, item):
        for client in self.clients:
            client.queue.put(str(i))

    def run(self):

        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:

            s.bind('./socket')
            s.listen()

            i = 1

            while True:
                conn, client_addr = s.accept()

                client = ClientThread(conn, client_addr)
                client.start()
                self.clients.append(client)

                i += 1


if __name__ == '__main__':

    client_handler = ClientHandlerThread()
    client_handler.start()

    i = 1

    while True:
        client_handler.push_item(str(i))

        if i % 10 == 0:
            print('[' + ', '.join([str(client.queue.qsize()) for client in client_handler.clients]) + ']')


        i += 1
        time.sleep(1)

Klien:

import socket

if __name__ == '__main__':
    with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
        s.connect('./socket')

        print('Connected to server')

        while True:
            data = s.recv(1024)

            if not data:
                print('Disconnected from server')
                break

            print(data.decode('utf-8'))
1
Scholar 3 Agustus 2019, 05:40

1 menjawab

Jawaban Terbaik

Catatan Anda mungkin harus membaca tentang hal-hal seperti aiohttp untuk versi jawaban yang lebih skalabel.


Untuk pertanyaan Anda, Anda dapat membuat beberapa perubahan untuk mencapai ini:

Pertama, ubah konstruktor ClientThread:

class ClientThread(Thread):

    def __init__(self, client_handler, conn, client_addr):
        self.client_handler = client_handler
        self.running = True
        ...

Saat pawang membuat objek, ia harus meneruskan self sebagai client_handler.

Di run method, gunakan

   def run(self):
        while True:
            ...

        self.running = False
        self.client_handler.purge() 

Artinya, ia menandai dirinya sebagai tidak berjalan, dan memanggil metode purge handler. Ini dapat ditulis sebagai

class ClientHandlerThread(Thread):
    ...
    def purge(self):
        self.clients = [c for c in self.clients if c.running]
2
Ami Tavory 3 Agustus 2019, 10:01