Strategy框架设计#
什么是Strategy?#
联邦学习策略是联邦学习区别于常规分布式机器学习的关键差别,对于模型训练效果影响非常关键。Strategy在联邦学习中主要由两部分组成,一部分是local部分的本地训练策略,其中包含对于loss函数的控制(和server端如何对齐等),梯度或者参数怎么上行传输;第二部分是server部分,包含如何对Clients上传的梯度或参数如何聚合,如何更新,如何下行分发等。SecretFlow在FLModel之下封装了一层Strategy,控制模型在联邦场景下的学习策略,隐语提供了Strategy Zoo,目前已支持涵盖Non-iid和通信优化等多种策略,而且在不断的迭代更新,用户只需要传递不同策略的alias name(“fed_avg_w”,”fed_avg_g”,”fed_prox”,”fed_scr”等),即可完成调用。同时Strategy框架支持用户自定义策略开发、注册、使用等。
SecretFlow的Strategy的定位以及特点#
根据strategy name来拉起不同的Workers
在Worker中定义如何进行local计算,计算完成后要上传哪些参数(g/w),是否压缩等。
在FLModel中根据strategy name来决定 server端如何计算。
为用户二次开发新的strategy提供了用户友好的接口,用户只需要定义好train_step,并提供聚合逻辑即可完成strategy的开发, 同时只需要用户一行代码就可以完成注册
架构图#
Strategy中包含了三个部分
local部分 train step的计算逻辑,以及需要传输的tensor内容
上下行数据传输的压缩方法
server部分的聚合逻辑
使用内置Strategy#
使用built-in Strategy是非常直接以及简单的,用户只需要把想要使用的strategy 方法名传入”FLModel”即可。目前提供的strategy(持续更新中):
fed_model = FLModel(
server=charlie,
device_list=device_list,
model=model,
aggregator=secure_aggregator,
strategy="fed_avg_w", # Just pass the strategy name to strategy
backend = "tensorflow")
自定义Strategy#
自定义strategy仅需以下几步
local train strategy
server aggragate strategy (not supported)
compressor strategy
register strategy
The local strategy#
SecretFlow中的local strategy是封装在fl_base之上的一层,继承BaseModel的所有属性和方法,并提供专属与特定strategy 的local 计算逻辑
class FedCustom(BaseTFModel):
"""
FedAvgW: A naive implementation of FedAvg, where the clients upload their trained model
weights to the server for averaging and update their local models via the aggregated weights
from the server in each federated round.
"""
def train_step(
self,
weights: np.ndarray,
cur_steps: int,
train_steps: int,
**kwargs,
) -> Tuple[np.ndarray, int]:
"""Accept ps model params, then do local train
Args:
updates: global updates from params server
cur_steps: current train step
train_steps: local training steps
kwargs: strategy-specific parameters
Returns:
Parameters after local training
"""
# Do local training
return model_weights, num_sample
compressor定义#
定义专属与当前strategy的tensor压缩算法,加快传输速度。将您需要进行下行压缩的策略加入COMPRESS_STRATEGY,FLModel会根据判断来决定是否在聚合后下行传输到client的时候进行压缩。client到server的上行阶段压缩,在您的local training strategy中完成即可。
COMPRESS_STRATEGY = ("fed_stc", "fed_scr")
def stc_compress(compressor, server_weights, agg_updates, res):
def _add(matrices_a: List, matrices_b: List):
results = [np.add(a, b) for a, b in zip(matrices_a, matrices_b)]
return results
def _subtract(matrices_a: List, matrices_b: List):
results = [np.subtract(a, b) for a, b in zip(matrices_a, matrices_b)]
return results
if res:
agg_updates = _add(agg_updates, res)
sparse_agg_updates = compressor(agg_updates)
res = _subtract(agg_updates, sparse_agg_updates)
server_weights = _add(server_weights, sparse_agg_updates)
return server_weights, sparse_agg_updates, res
register strategy#
@register_strategy(strategy_name='fed_custom_name', backend='tensorflow')
@proxy(PYUObject)
class PYUFedCustom(FedCustom):
pass
到这里就完成开发了,您可以将构造好新的strategy_name传入FL_Model的strategy字段,就可以启用新的联邦学习策略啦