Source code for golf_federated.server.process.strategy.aggregation.synchronous

# -*- coding: utf-8 -*-
# @Author             : GZH
# @Created Time       : 2022/11/3 12:43
# @Email              : guozh29@mail2.sysu.edu.cn
# @Last Modified By   : GZH
# @Last Modified Time : 2022/11/3 12:43
from copy import deepcopy
from queue import Queue
from typing import List, Tuple

import pandas as pd

from golf_federated.server.process.strategy.aggregation.function import fedavg, SLMFedsyn, Cedarsyn
from golf_federated.server.process.strategy.aggregation.base import BaseFed
from golf_federated.server.process.strategy.selection.function import softmax_prob_from_indicators
from golf_federated.utils.log import loggerhear


[docs]class FedAVG(BaseFed): """ Synchronous FL with FedAVG, inheriting from BaseFed class. From: "Communication-Efficient Learning of Deep Networks from Decentralized Data" (http://proceedings.mlr.press/v54/mcmahan17a/mcmahan17a.pdf) """ def __init__( self, min_to_start: int = 2 ) -> None: """ Initialize the FedAVG object. Args: min_to_start (int): Minimum number of received local model parameters for global model aggregation. Default as 2. """ # Super class init. super().__init__( name='fedavg', synchronous=True, min_to_start=min_to_start ) loggerhear.log("Server Info ", "Being Adopting FedAVG")
[docs] def aggregate( self, datadict: { 'current_w': List, 'parameter': Queue, 'record' : List } ) -> List: """ Abstract method for aggregation. Args: datadict (dict): Data that will be input into the aggregation function, including current global model weights, client uploaded parameters and evaluation records. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ # Create temporary variables weight = [] data_size = [] # Get the specified data. parameter = datadict['parameter'] while not parameter.empty(): temp = parameter.get() weight.append(temp['model']) data_size.append(temp['aggregation_field']['dataSize']) # Calling aggregation function. current_global_w = fedavg(weight, data_size) # Counter plus one. self.aggregation_version += 1 return current_global_w
[docs] def get_field(self) -> List: """ Get the fields needed for aggregation. Returns: List: Fields needed for aggregation """ # Data size of client return ['dataSize']
[docs]class FedProx(BaseFed): """ Synchronous FL with FedProx, inheriting from BaseFed class. """ def __init__( self, miu: float = 1, min_to_start: int = 2 ) -> None: """ Initialize the FedProx object. Args: miu (float): Corresponds to the parameter μ defined in FedProx. Default as 1. min_to_start (int): Minimum number of received local model parameters for global model aggregation. Default as 2. """ # Super class init. super().__init__( name='fedprox', synchronous=True, min_to_start=min_to_start ) # Initialize object properties. self.miu = miu loggerhear.log("Server Info ", "Being Adopting FedProx")
[docs] def aggregate( self, datadict: { 'current_w': List, 'parameter': Queue, 'record' : List } ) -> List: """ Abstract method for aggregation. Args: datadict (dict): Data that will be input into the aggregation function, including current global model weights, client uploaded parameters and evaluation records. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ # Create temporary variables weight = [] data_size = [] # Get the specified data. parameter = datadict['parameter'] while not parameter.empty(): temp = parameter.get() weight.append(temp['model']) data_size.append(temp['aggregation_field']['dataSize']) # Calling aggregation function. current_global_w = fedavg(weight, data_size) # Counter plus one. self.aggregation_version += 1 return current_global_w
[docs] def get_field(self) -> List: """ Get the fields needed for aggregation. Returns: List: Fields needed for aggregation """ # Data size of client return ['dataSize']
[docs]class SLMFed_syn(BaseFed): """ Synchronous FL with SLMFed, inheriting from BaseFed class. """ def __init__( self, min_to_start: int = 2 ) -> None: """ Initialize the SLMFed_syn object. Args: min_to_start (int): Minimum number of received local model parameters for global model aggregation. Default as 2. """ # Super class init. super().__init__( name='SLMFed_syn', synchronous=True, min_to_start=min_to_start ) loggerhear.log("Server Info ", "Being Adopting SLMFed_syn")
[docs] def aggregate( self, datadict: { 'current_w': List, 'parameter': Queue, 'record' : List } ) -> List: """ Abstract method for aggregation. Args: datadict (dict): Data that will be input into the aggregation function, including current global model weights, client uploaded parameters and evaluation records. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ # Create temporary variables client_information_richness = [] client_datasize = [] weight = [] # Get the specified data. current_weight = datadict['current_w'] parameter = datadict['parameter'] while not parameter.empty(): temp = parameter.get() weight.append(temp['model']) weight[temp['name']] = temp['model'] client_information_richness[temp['name']] = temp['aggregation_field']['informationRichness'] client_datasize[temp['name']] = temp['aggregation_field']['dataSize'] # Get weight of clients for aggregation. aggregate_percentage = softmax_prob_from_indicators([client_information_richness, client_datasize]) # Calling aggregation function. current_global_w = SLMFedsyn(weight=weight, aggregate_percentage=aggregate_percentage, current_weight=current_weight) # Counter plus one. self.aggregation_version += 1 return current_global_w
[docs] def get_field(self) -> List: """ Get the fields needed for aggregation. Returns: List: Fields needed for aggregation """ # Information richness and data size of client. return ['informationRichness', 'dataSize'] return ['dataSize']
[docs]class Cedar_syn(BaseFed): """ Synchronous FL with Cedar, inheriting from BaseFed class. """ def __init__( self, dataset_path: str, min_to_start: int = 2, num_class: int = 0, detect: bool = False ) -> None: """ Initialize the Cedar object. Args: dataset_path (str): Path to Stimuli. min_to_start (int): Minimum number of received local model parameters for global model aggregation. Default as 2. num_class (int): Number of data classes. Default as 0. detect (bool): Whether to detect malicious updates. Default as False. """ # Super class init. super().__init__( name='Cedar_syn', synchronous=True, min_to_start=min_to_start ) self.layer_num_list = [] self.detect = detect self.num_class = num_class self.dataset_path = dataset_path self.stimulus_x, self.stimulus_y = self.prepare_stimulus_LFA(1) loggerhear.log("Server Info ", "Being Adopting Cedar_syn")
[docs] def aggregate( self, datadict: { 'current_w': List, 'parameter': Queue, 'record' : List } ) -> List: """ Abstract method for aggregation. Args: datadict (dict): Data that will be input into the aggregation function, including current global model weights, client uploaded parameters and evaluation records. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ local_model = [] upgrade_bool_dataframe = pd.DataFrame() upgrade_bool_list = [] current_model = datadict['current_w'] parameter = datadict['parameter'] while not parameter.empty(): temp = parameter.get() local_model.append(temp['model']) upgrade_bool_list.append(temp['aggregation_field']['upgrade_bool']) upgrade_bool_dataframe = pd.concat( [upgrade_bool_dataframe, pd.DataFrame(temp['aggregation_field']['upgrade_bool'])], axis=1) require_judge_layer = temp['aggregation_field']['REQUIRE_JUDGE_LAYER'] NUM_LAYER = temp['aggregation_field']['NUM_LAYER'] require_judge_layer = list(require_judge_layer) local_model_object = [] for l_m in local_model: for k, v in l_m.items(): if l_m[k] is None: l_m[k] = current_model.state_dict()[k] temp_model = deepcopy(current_model) temp_model.load_state_dict(l_m) local_model_object.append(temp_model) layer_weight = [] layer_sum = [] for i in range(NUM_LAYER): if upgrade_bool_dataframe.iloc[i, :].sum() != 0: layer_sum.append(upgrade_bool_dataframe.iloc[i, :].sum()) layer_weight.append(1 / upgrade_bool_dataframe.iloc[i, :].sum()) else: layer_weight.append('inf') self.layer_num_list.append(layer_sum) current_global_w = Cedarsyn(local_model=local_model_object, detect=self.detect, stimulus_x=self.stimulus_x, current_model=current_model, num_layer=NUM_LAYER, layer_weight=layer_weight, upgrade_bool_list=upgrade_bool_list, require_judge_layer=require_judge_layer) # Counter plus one. self.aggregation_version += 1 return current_global_w
[docs] def get_field(self) -> list: """ Get the fields needed for aggregation. Returns: List: Fields needed for aggregation """ # Information richness and data size of client. return []
[docs] def prepare_stimulus_LFA( self, each_num: int ) -> Tuple: """ Prepare the stimuli Args: each_num (int): Number of samples per class in the stimuli. Returns: Tuple[torch.tensor,torch.tensor]: Stimuli samples and labels. """ import torch stimulus_data_ori = torch.load(self.dataset_path) stimulus_x = [] stimulus_y = [] categorize_flag = [] stimulus_list = [i for i in range(self.num_class)] for single_class in stimulus_list: categorize_flag.append([0, single_class]) for i in range(len(stimulus_data_ori)): for class_flag in categorize_flag: if (stimulus_data_ori[i][1] == class_flag[1]) & (class_flag[0] < each_num): if len(stimulus_x) == 0: stimulus_x = torch.tensor(stimulus_data_ori[i][0]).unsqueeze(0) stimulus_y.append(stimulus_data_ori[i][1]) else: temp_b = torch.tensor(stimulus_data_ori[i][0]).unsqueeze(0) stimulus_x = torch.cat([stimulus_x, temp_b], 0) stimulus_y.append(stimulus_data_ori[i][1]) class_flag[0] = class_flag[0] + 1 stimulus_y = torch.tensor(stimulus_y) return stimulus_x, stimulus_y