Source code for golf_federated.server.process.strategy.aggregation.function

# -*- coding: utf-8 -*-
# @Author             : GZH
# @Created Time       : 2022/11/3 12:43
# @Email              : guozh29@mail2.sysu.edu.cn
# @Last Modified By   : GZH
# @Last Modified Time : 2022/11/3 12:43
from copy import deepcopy
from typing import Callable, List
import numpy as np
import math
import pandas as pd

from golf_federated.utils.data import deepcopy_list


[docs]def fedavg( weight: List, data_size: List ) -> List: """ Function implementation of FedAVG, which directly averages the corresponding values of collected model parameters. Args: weight (List): List of models to aggregate. data_size (List): List of data sizes of clients. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ # Number of clients uploading local model parameters. num = len(weight) # Use a loop to calculate the sum of each model parameter value, and then take the average. result = np.array(weight[0]) * data_size[0] for i in range(1, num): w = np.array(weight[i]) d = data_size[i] result += w * d return_result = (result / sum(data_size)).tolist() return return_result
[docs]def fedfd( client_id: List, weight: dict, client_round: dict, version_latest: int, ) -> List: """ Function implementation of FedFD, which weighted averages the corresponding values of collected model parameters. Args: client_id (List): ID of clients that upload the models. weight (dict): Corresponding dictionary of client IDs and models to aggregate. client_round (dict): Corresponding dictionary of client IDs and number of training rounds for local models. version_latest (int): Latest model version. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ # Use a loop to calculate the weighted sum of each model parameter value, and then take the average. total = 0 w = 0 for c in client_id: total += (version_latest - client_round[c] + 1) ** (-0.5) try: weight_c = weight[c] except: continue theta = ((version_latest - client_round[c] + 1) ** (-0.5)) w += theta * np.array(weight_c) global_model = w / total return_result = global_model.tolist() return return_result
[docs]def fedasync( client_id: List, weight: dict, staleness: str, current_weight: List, current_round: int, client_round: dict, alpha: float, beta: float ) -> List: """ Function implementation of FedAsync. Args: client_id (List): List of uploaded client names. weight (dict): Dict of uploaded local model weight. staleness (str): Corresponds to the name of the function defined in FedAsync. current_weight (List): Current global model parameters. current_round (int): Number of current training round. client_round (dict): Number of global round corresponding to the model trained by each client. alpha (float): Corresponds to the parameter α defined in FedAsync. beta (float): Corresponds to the parameter β defined in FedAsync. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ # Initialize temporary variables. alpha_clients = [] weight_list = deepcopy_list(current_weight) return_result = deepcopy_list(current_weight) first = True layer_num = len(current_weight) # For all uploaded clients, model parameters are weightily summarized. for c_id in client_id: c_weight = deepcopy_list(weight[c_id]) c_round = client_round[c_id] if staleness == 'Linear': s = 1 / (alpha * (current_round - c_round) + 1) elif staleness == 'Polynomial': s = math.pow(current_round - c_round + 1, (-alpha)) elif staleness == 'Exponential': s = math.exp(-alpha * (current_round - c_round)) elif staleness == 'Hinge': if current_round - c_round <= beta: s = 1 else: s = 1 / (alpha * (current_round - c_round - beta)) else: s = 1 alpha_c = s * alpha alpha_clients.append(alpha_c) for l in range(layer_num): if first: weight_list[l] = c_weight[l] * alpha_c else: weight_list[l] += c_weight[l] * alpha_c first = False # The summarized model parameters are averaged to obtain the global model parameters. avg_alpha = sum(alpha_clients) / len(alpha_clients) for l in range(len(current_weight)): return_result[l] = (1 - avg_alpha) * current_weight[l] + avg_alpha * weight_list[l] / sum(alpha_clients) return return_result
[docs]def SLMFedsyn( weight: List, aggregate_percentage: List, current_weight: List ) -> List: """ Function implementation of SLMFed_Sync. Args: weight (List): List of client model parameters for aggregation. aggregate_percentage (List): Aggregate weights for each client. current_weight (List): Current global model parameters. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ # Initialize temporary variables. aggregate_percentage_array = np.array(aggregate_percentage) weight_array = np.array(weight) client_num = len(weight) layer_num = len(weight[0]) first = [0 for i in range(layer_num)] return_result = deepcopy_list(current_weight) # Update the parameters for each layer of the model separately. for l in range(layer_num): none_client = [] # Calculate parameters for all clients. for i in range(client_num): if weight_array[i][l] is None: none_client.append(i) continue else: content = weight_array[i][l] * aggregate_percentage_array[i] if first[l] == 0: return_result[l] = content first[l] = 1 else: return_result[l] += content # Adjust the parameters according to the proportion of None if 0 < len(none_client) < client_num: none_prob = 0 for i in none_client: none_prob += aggregate_percentage_array[i] return_result[l] = return_result[l] / (1 - none_prob) return return_result
[docs]def SLMFedasyn( client_id: List, weight: dict, aggregate_percentage: dict, current_weight: List, current_acc: float, target_acc: float, func: str ) -> List: """ Function implementation of SLMFed_Async. Args: client_id (List): List of client IDs for aggregation. weight (dict): Dictionary of client model parameters for aggregation. aggregate_percentage (dict): Aggregate weights for each client. current_weight (List): Current global model parameters. current_acc (float): Current accuracy corresponding to the global model. target_acc (float): Target accuracy of the task. func (str): Function to adjust aggregation weights. Default as 'other'. Returns: List: The model generated after aggregation. And use a list to store the parameters of different layers. """ # Initialize temporary variables. layer_num = len(weight[client_id[0]]) return_result = deepcopy_list(current_weight) upload_content = deepcopy_list(current_weight) first = [0 for i in range(layer_num)] if func == 'linear': alpha = current_acc / target_acc elif func == 'concave_exp': alpha = 1 - math.exp(-2 * math.e * current_acc / target_acc) elif func == 'convex_quadratic': alpha = (current_acc / target_acc) ** 2 elif func == 'concave_quadratic': alpha = 1 - (current_acc / target_acc - 1) ** 2 elif func == 'convex_exp': alpha = (math.exp(current_acc / target_acc) - 1) / (math.e - 1) else: alpha = 0.5 # Update the parameters for each layer of the model separately. for l in range(layer_num): p_sum = 0 # Calculate parameters for all clients. for id in client_id: if weight[id][l] is None: continue else: p = aggregate_percentage[id] p_sum += p content = weight[id][l] * p if first[l] == 0: upload_content[l] = content first[l] = 1 else: upload_content[l] += content # Adjust parameters according to established rules. if p_sum != 0: q_new = (1 - p_sum) * alpha new_content = upload_content[l] / p_sum old_content = return_result[l] return_result[l] = new_content * (1 - q_new) + old_content * q_new return return_result
[docs]def fedprox_loss( model_library: str, w_global: List, w_local: List, miu: float = 1 ) -> Callable: """ The optimized loss function defined in FedProx. Args: model_library (str): The library used to build model. w_global (List): Global model. w_local (List): Local model. miu (float): Corresponds to the parameter μ defined in FedProx. Default as 1. Returns: Callable: Loss function. """ def loss(y_true, y_pred): proximal_term = 0 for w_g, w_l in zip(w_global, w_local): proximal_term += np.linalg.norm([w_g, w_l]) if model_library == 'tensorflow' or model_library == 'keras': from tensorflow.keras.losses import sparse_categorical_crossentropy return sparse_categorical_crossentropy(y_true, y_pred) + miu / 2 * proximal_term return loss
[docs]def mean2(x): import torch y = torch.sum(x) / len(x) return y
[docs]def corr2(a, b): a = a - mean2(a) b = b - mean2(b) import torch r = torch.sum(a * b) / torch.sqrt(torch.sum(a * a) * torch.sum(b * b)) return r
import torch from torch.autograd import Variable
[docs]def Cedarsyn( local_model: List, detect: bool, current_model: object, num_layer: int, layer_weight: List, stimulus_x: torch.tensor, require_judge_layer: List, upgrade_bool_list: pd.DataFrame, ) -> object: """ Function implementation of Cedar_Sync. Args: local_model (List): List of client model parameters for aggregation. detect (bool): Whether to detect malicious updates. current_model (object): Current global model. num_layer (int): Number of layers involved in filtering. layer_weight (List): Weights to measure the importance of layers. stimulus_x (torch.tensor): Stimuli. require_judge_layer (List): Layers involved in filtering. upgrade_bool_list (pandas.DataFrame): Matrix of whether the layer is uploaded. Returns: object: Updated model. """ require_fill_layer = [] for i in range(num_layer): if layer_weight[i] == 'inf': require_fill_layer.append(require_judge_layer[i]) global_model_copy = deepcopy(current_model) aggregate_percentage = [] RS = np.zeros([1, len(local_model)]) from torch.nn import functional as F device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') for i_local_model in range(len(local_model)): stimulus_num = 10 temp_model1 = deepcopy(local_model[i_local_model]) temp_model2 = deepcopy(current_model) model1_out = temp_model1(stimulus_x) model2_out = temp_model2(stimulus_x) model1_softmax = torch.tensor(F.softmax(model1_out, dim=1), dtype=torch.float32) model2_softmax = torch.tensor(F.softmax(model2_out, dim=1), dtype=torch.float32) model1_RDM = torch.zeros((stimulus_num, stimulus_num)).to(device) model2_RDM = torch.zeros((stimulus_num, stimulus_num)).to(device) for i in range(stimulus_num): for j in range(stimulus_num): temp_a = model1_softmax[i] temp_c = temp_a.view(1, -1) temp_b = model1_softmax[j] temp_d = temp_b.view(1, -1) model1_RDM[i][j] = torch.cosine_similarity(temp_c, temp_d) model2_RDM[i][j] = torch.cosine_similarity(model2_softmax[i].view(1, -1), model2_softmax[j].view(1, -1)) corr_result = corr2(model1_RDM, model2_RDM) RS[0][i_local_model] = corr_result aggregate_percentage.append(float(corr_result)) if detect: RS = (RS - RS.min()) / (RS.max() - RS.min()) from sklearn.cluster import KMeans cluster = KMeans(n_clusters=2) cluster_result = cluster.fit(RS.reshape(-1, 1)).labels_ from collections import Counter cluster_benign = Counter(cluster_result).most_common(1)[0][0] id_train = [] for i in range(len(local_model)): if cluster_result[i] == cluster_benign: id_train.append(i) aggregate_percentage[i] = 0 sum_aggregate_percentage = sum(aggregate_percentage) for a in range(len(aggregate_percentage)): aggregate_percentage[a] = aggregate_percentage[a] / sum_aggregate_percentage aggregate_percentage = np.array(aggregate_percentage) for j in range(len(local_model)): for global_w, client_w in zip(current_model.named_parameters(), local_model[j].named_parameters()): global_name, global_param = global_w client_name, client_param = client_w for layer in require_judge_layer: if global_name.__contains__(layer): judge_sign = 1 break else: judge_sign = 0 if judge_sign == 0: if j == 0: param_tem = Variable(torch.zeros_like(global_param)) global_param.data.copy_(param_tem.data) global_param.data.add_(client_param.data * aggregate_percentage[j]) else: if j == 0: param_tem = Variable(torch.zeros_like(global_param)) global_param.data.copy_(param_tem.data) for layer in require_judge_layer: if global_name.__contains__(layer): idx = list(require_judge_layer).index(layer) if upgrade_bool_list[j][idx] == 1: global_param.data.add_(client_param.data * layer_weight[idx]) for layer in require_fill_layer: for global_w, global_copy in zip(current_model.named_parameters(), global_model_copy.named_parameters()): global_name, global_param = global_w global_copy_name, global_copy_param = global_copy if global_name.__contains__(layer): global_param.data.add_(global_copy_param.data * 1) return current_model