Source code for golf_federated.server.process.strategy.aggregation.asynchronous

# -*- coding: utf-8 -*-
# @Author             : GZH
# @Created Time       : 2022/11/3 12:43
# @Email              : guozh29@mail2.sysu.edu.cn
# @Last Modified By   : GZH
# @Last Modified Time : 2022/11/3 12:43

from queue import Queue
from typing import List

from golf_federated.server.process.strategy.aggregation.base import BaseFed
from golf_federated.server.process.strategy.aggregation.function import fedfd, SLMFedasyn, fedasync
from golf_federated.server.process.strategy.selection.function import softmax_prob_from_indicators
from golf_federated.utils.log import loggerhear


[docs]class FedFD(BaseFed): """ Asynchronous FL with FedFD, inheriting from BaseFed class. """ def __init__( self, min_to_start: int = 2 ) -> None: """ Initialize the FedFD object. Args: min_to_start (int): Minimum number of received local model parameters for global model aggregation. Default as 2. """ # Super class init. super().__init__( name='fedfd', synchronous=False, min_to_start=min_to_start ) loggerhear.log("Server Info ", "Being Adopting FedFD")
[docs] def aggregate( self, datadict: { 'current_w': List, 'parameter': Queue, 'record' : List } ) -> List: """ Abstract method for aggregation. Args: datadict (dict): Data that will be input into the aggregation function, including current global model weights, client uploaded parameters and evaluation records. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ # Create temporary variables client_id = [] weight = dict() client_round = dict() # Get the specified data. version_latest = self.aggregation_version parameter = datadict['parameter'] while not parameter.empty(): temp = parameter.get() client_id.append(temp['name']) weight[temp['name']] = temp['model'] client_round[temp['name']] = temp['aggregation_field']['clientRound'] # Calling aggregation function. current_global_w = fedfd( client_id=client_id, weight=weight, client_round=client_round, version_latest=version_latest ) # Counter plus one. self.aggregation_version += 1 return current_global_w
[docs] def get_field(self) -> List: """ Get the fields needed for aggregation. Returns: List: Fields needed for aggregation """ # Rounds of local training. return ['clientRound']
[docs]class FedAsync(BaseFed): """ Asynchronous FL with FedAsync, inheriting from BaseFed class. From: "Asynchronous Federated Optimization" (https://opt-ml.org/oldopt/papers/2020/paper_28.pdf) """ def __init__( self, alpha: float = 0.5, beta: float = 0.0, staleness: str = 'Polynomial', min_to_start: int = 2 ) -> None: """ Initialize the FedAsync object. Args: alpha (float): Corresponds to the parameter α defined in FedAsync. Default as 0.5. beta (float): Corresponds to the parameter β defined in FedAsync. Default as 0.0. staleness (str): Corresponds to the name of the function defined in FedAsync. Default as 'Polynomial'. min_to_start (int): Minimum number of received local model parameters for global model aggregation. Default as 2. """ # Super class init. super().__init__( name='fedasync', synchronous=False, min_to_start=min_to_start ) # Initialize object properties. self.alpha = alpha self.beta = beta self.staleness = staleness loggerhear.log("Server Info ", "Being Adopting FedAsync")
[docs] def aggregate( self, datadict: { 'current_w': List, 'parameter': Queue, 'record' : List } ) -> List: """ Abstract method for aggregation. Args: datadict (dict): Data that will be input into the aggregation function, including current global model weights, client uploaded parameters and evaluation records. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ # Create temporary variables client_id = [] weight = dict() # Get the specified data. current_weight = datadict['current_w'] current_round = self.aggregation_version client_round = dict() parameter = datadict['parameter'] while not parameter.empty(): temp = parameter.get() client_id.append(temp['name']) weight[temp['name']] = temp['model'] client_round[temp['name']] = temp['aggregation_field']['clientRound'] # Calling aggregation function. current_global_w = fedasync( client_id=client_id, weight=weight, staleness=self.staleness, current_weight=current_weight, current_round=current_round, client_round=client_round, alpha=self.alpha, beta=self.beta ) # Counter plus one. self.aggregation_version += 1 return current_global_w
[docs] def get_field(self) -> List: """ Get the fields needed for aggregation. Returns: List: Fields needed for aggregation """ # Rounds of local training. return ['clientRound']
[docs]class SLMFed_asyn(BaseFed): """ Asynchronous FL with SLMFed, inheriting from BaseFed class. """ def __init__( self, target_acc: float, func: str = 'other', min_to_start: int = 2, ) -> None: """ Initialize the SLMFed_asyn object. Args: target_acc(float): Target accuracy of the task. func (str): Function to adjust aggregation weights. Default as 'other'. min_to_start (int): Minimum number of received local model parameters for global model aggregation. Default as 2. """ # Super class init. super().__init__( name='SLMFed_asyn', synchronous=False, min_to_start=min_to_start ) # Initialize object properties. self.func = func self.target_acc = target_acc loggerhear.log("Server Info ", "Being Adopting SLMFed_asyn")
[docs] def aggregate( self, datadict: { 'current_w': List, 'parameter': Queue, 'record' : List } ) -> List: """ Abstract method for aggregation. Args: datadict (dict): Data that will be input into the aggregation function, including current global model weights, client uploaded parameters and evaluation records. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ # Create temporary variables client_information_richness = [] client_datasize = [] client_id = [] weight = dict() aggregate_percentage = dict() # Get the specified data. current_weight = datadict['current_w'] current_acc = datadict['record'][-1] target_acc = self.target_acc parameter = datadict['parameter'] while not parameter.empty(): temp = parameter.get() client_id.append(temp['name']) weight[temp['name']] = temp['model'] client_information_richness[temp['name']] = temp['aggregation_field']['informationRichness'] client_datasize[temp['name']] = temp['aggregation_field']['dataSize'] # Get weight of clients for aggregation. percentage = softmax_prob_from_indicators([client_information_richness, client_datasize]) for i in range(len(client_id)): aggregate_percentage[client_id[i]] = percentage[i] # Calling aggregation function. current_global_w = SLMFedasyn( client_id=client_id, weight=weight, aggregate_percentage=aggregate_percentage, current_weight=current_weight, current_acc=current_acc, func=self.func, target_acc=target_acc ) # Counter plus one. self.aggregation_version += 1 return current_global_w
[docs] def get_field(self) -> List: """ Get the fields needed for aggregation. Returns: List: Fields needed for aggregation """ # Information richness and data size of client. return ['informationRichness', 'dataSize']