Source code for golf_federated.client.communication.sse.monitor

# -*- coding: utf-8 -*-
# @Author             : GZH
# @Created Time       : 2022/11/14 16:00
# @Email              : guozh29@mail2.sysu.edu.cn
# @Last Modified By   : GZH
# @Last Modified Time : 2022/11/14 16:00

import threading
from sseclient import SSEClient

from golf_federated.utils.log import loggerhear


[docs]def monitor( client: object, host: str, port: str, thread_name: str = 'monitor', ) -> None: """ Create an information channel and listen to server pushes. Args: client (MultiDeviceClient): Client to listen. host (str): Host name to connect to the host. port (str): Port number to connect to the host. thread_name (str): Child thread name. Default as 'monitor' """ # Define the thread running process function. def run_in_thread() -> None: """ Child thread receives the information pushed by SSE(Server-Sent Events) client. """ while True: # Receive information. try: messages = SSEClient("http://" + host + ":" + str(port) + "/sse") except: messages = [] for msg in messages: # Judge information content. if msg.data == str(b'UpdateModel'): # Update local model. client.update_model() client.train() client.upload_local_weight() elif msg.data == str(b'StopTrain'): # Stop local training. client.stop() elif msg.data == str(b'TaskInit'): # Init training task. client.init_trainer() client.train() client.upload_local_weight() # Define the thread running process class. class ChildThread(threading.Thread): def __init__( self, name: str ) -> None: """ Initialize child thread class object. Args: name (str):Child thread name. """ threading.Thread.__init__(self) self.name = name def run(self) -> None: """ Run the process of child thread. """ run_in_thread() # Start child thread loggerhear.log( "Client Info ", "Client %s is creating an information channel and listening to server pushes." % client.client_name ) ChildThread(thread_name).start()