secretflow.ml.nn.fl#

secretflow.ml.nn.fl.compress#

Functions:

stc_compress(compressor, server_weights, ...)

do_compress([strategy, sparsity, ...])

secretflow.ml.nn.fl.compress.stc_compress(compressor, server_weights, agg_updates, res)[源代码]#
secretflow.ml.nn.fl.compress.do_compress(strategy='batch', sparsity=0.0, server_weights=None, updates=None, res=None)[源代码]#

secretflow.ml.nn.fl.fl_model#

FedModel

Classes:

FLModel([server, device_list, model, ...])

class secretflow.ml.nn.fl.fl_model.FLModel(server=None, device_list: List[PYU] = [], model: Union[TorchModel, Callable[[], tensorflow.keras.Model]] = None, aggregator=None, strategy='fed_avg_w', consensus_num=1, backend='tensorflow', random_seed=None, **kwargs)[源代码]#

基类:object

Methods:

__init__([server, device_list, model, ...])

Interface for horizontal federated learning .

init_workers(model, device_list, strategy, ...)

initialize_weights()

fit(x, y[, batch_size, batch_sampling_rate, ...])

Horizontal federated training interface

predict(x[, batch_size, label_decoder, ...])

Horizontal federated offline prediction interface

evaluate(x[, y, batch_size, sample_weight, ...])

Horizontal federated offline evaluation interface

save_model(model_path[, is_test, saved_model])

Horizontal federated save model interface

load_model(model_path[, is_test, ...])

Horizontal federated load model interface

__init__(server=None, device_list: List[PYU] = [], model: Union[TorchModel, Callable[[], tensorflow.keras.Model]] = None, aggregator=None, strategy='fed_avg_w', consensus_num=1, backend='tensorflow', random_seed=None, **kwargs)[源代码]#

Interface for horizontal federated learning .. attribute:: server

PYU, Which PYU as a server

device_list#

party list

model#

model definition function

aggregator#

Security aggregators can be selected according to the security level

strategy#

Federated training strategy

consensus_num#

Num parties of consensus,Some strategies require multiple parties to reach consensus,

backend#

Engine backend, the backend needs to be consistent with the model type

random_seed#

If specified, the initial value of the model will remain the same, which ensures reproducible

init_workers(model, device_list, strategy, backend, random_seed)[源代码]#
initialize_weights()[源代码]#
fit(x: Union[HDataFrame, FedNdarray, Dict[PYU, str]], y: Union[HDataFrame, FedNdarray, str], batch_size: Union[int, Dict[PYU, int]] = 32, batch_sampling_rate: Optional[float] = None, epochs: int = 1, verbose: int = 1, callbacks=None, validation_data=None, shuffle=False, class_weight=None, sample_weight=None, validation_freq=1, aggregate_freq=1, label_decoder=None, max_batch_size=20000, prefetch_buffer_size=None, sampler_method='batch', random_seed=None, dp_spent_step_freq=None, audit_log_dir=None, dataset_builder: Optional[Dict[PYU, Callable]] = None) History[源代码]#

Horizontal federated training interface

参数:
  • x – feature, FedNdArray, HDataFrame or Dict {PYU: model_path}

  • y – label, FedNdArray, HDataFrame or str(column name of label)

  • batch_size – Number of samples per gradient update, int or Dict, recommend 64 or more for safety

  • batch_sampling_rate – Ratio of sample per batch, float

  • epochs – Number of epochs to train the model

  • verbose – 0, 1. Verbosity mode

  • callbacks – List of keras.callbacks.Callback instances.

  • validation_data – Data on which to evaluate

  • shuffle – whether to shuffle the training data

  • class_weight – Dict mapping class indices (integers) to a weight (float)

  • sample_weight – weights for the training samples

  • validation_freq – specifies how many training epochs to run before a new validation run is performed

  • aggregate_freq – Number of steps of aggregation

  • label_decoder – Only used for CSV reading, for label preprocess

  • max_batch_size – Max limit of batch size

  • prefetch_buffer_size – An int specifying the number of feature batches to prefetch for performance improvement. Only for csv reader

  • sampler_method – The name of sampler method

  • random_seed – Prg seed for shuffling

  • dp_spent_step_freq – specifies how many training steps to check the budget of dp

  • audit_log_dir – path of audit log dir, checkpoint will be save if audit_log_dir is not None

  • dataset_builder – Callable function about hot to build the dataset. must return (dataset, steps_per_epoch)

返回:

A history object. It’s history.global_history attribute is a aggregated record of training loss values and metrics, while history.local_history attribute is a record of training loss values and metrics of each party.

predict(x: Union[HDataFrame, FedNdarray, Dict], batch_size=None, label_decoder=None, sampler_method='batch', random_seed=1234, dataset_builder: Optional[Dict[PYU, Callable]] = None) Dict[PYU, PYUObject][源代码]#

Horizontal federated offline prediction interface

参数:
  • x – feature, FedNdArray or HDataFrame

  • batch_size – Number of samples per gradient update, int or Dict

  • label_decoder – Only used for CSV reading, for label preprocess

  • sampler_method – The name of sampler method

  • random_seed – Prg seed for shuffling

  • dataset_builder – Callable function about hot to build the dataset. must return (dataset, steps_per_epoch)

返回:

predict results, numpy.array

evaluate(x: Union[HDataFrame, FedNdarray, Dict], y: Optional[Union[HDataFrame, FedNdarray, str]] = None, batch_size: Union[int, Dict[PYU, int]] = 32, sample_weight: Optional[Union[HDataFrame, FedNdarray]] = None, label_decoder=None, return_dict=False, sampler_method='batch', random_seed=None, dataset_builder: Optional[Dict[PYU, Callable]] = None) Tuple[Union[List[Metric], Dict[str, Metric]], Union[Dict[str, List[Metric]], Dict[str, Dict[str, Metric]]]][源代码]#

Horizontal federated offline evaluation interface

参数:
  • x – Input data. It could be: - FedNdArray - HDataFrame - Dict {PYU: model_path}

  • y – Label. It could be: - FedNdArray - HDataFrame - str column name of csv

  • batch_size – Integer or Dict. Number of samples per batch of computation. If unspecified, batch_size will default to 32.

  • sample_weight – Optional Numpy array of weights for the test samples, used for weighting the loss function.

  • label_decoder – User define how to handle label column when use csv reader

  • return_dict – If True, loss and metric results are returned as a dict, with each key being the name of the metric. If False, they are returned as a list.

  • sampler_method – The name of sampler method.

  • dataset_builder – Callable function about hot to build the dataset. must return (dataset, steps_per_epoch)

返回:

A tuple of two objects. The first object is a aggregated record of metrics, and the second object is a record of training loss values and metrics of each party.

save_model(model_path: Union[str, Dict[PYU, str]], is_test=False, saved_model=False)[源代码]#

Horizontal federated save model interface

参数:
  • model_path – model path, only support format like ‘a/b/c’, where c is the model name

  • is_test – whether is test mode

  • saved_model – bool Whether to save as savedmodel or torchscript format

load_model(model_path: Union[str, Dict[PYU, str]], is_test=False, saved_model=False, force_all_participate=False)[源代码]#

Horizontal federated load model interface

参数:
  • model_path – model path

  • is_test – whether is test mode

  • saved_model – bool Whether to load from savedmodel or torchscript format

secretflow.ml.nn.fl.strategy_dispatcher#

Classes:

Dispatcher()

Functions:

register_strategy([_cls, strategy_name, backend])

register new strategy

dispatch_strategy(name, backend, *args, **kwargs)

strategy dispatcher

class secretflow.ml.nn.fl.strategy_dispatcher.Dispatcher[源代码]#

基类:object

Methods:

__init__()

register(name, cls)

dispatch(name, backend, *args, **kwargs)

__init__()[源代码]#
register(name, cls)[源代码]#
dispatch(name, backend, *args, **kwargs)[源代码]#
secretflow.ml.nn.fl.strategy_dispatcher.register_strategy(_cls=None, *, strategy_name=None, backend=None)[源代码]#

register new strategy

参数:
  • _cls

  • strategy_name – name of strategy

Returns:

secretflow.ml.nn.fl.strategy_dispatcher.dispatch_strategy(name, backend, *args, **kwargs)[源代码]#

strategy dispatcher

参数:
  • name – name of strategy, str

  • *args

  • **kwargs

Returns:

secretflow.ml.nn.fl.utils#

Classes:

History(local_history, Dict[str, ...)

Functions:

metric_wrapper(func, *args, **kwargs)

optim_wrapper(func, *args, **kwargs)

class secretflow.ml.nn.fl.utils.History(local_history: Dict[str, Dict[str, List[float]]] = <factory>, local_detailed_history: Dict[str, Dict[str, List[secretflow.ml.nn.metrics.Metric]]] = <factory>, global_history: Dict[str, List[float]] = <factory>, global_detailed_history: Dict[str, List[secretflow.ml.nn.metrics.Metric]] = <factory>)[源代码]#

基类:object

Attributes:

local_history

Examples: >>> { 'alice': {'loss': [0.46011224], 'accuracy': [0.8639647]}, 'bob': {'loss': [0.46011224], 'accuracy': [0.8639647]}, }

local_detailed_history

Examples: >>> { 'alice': { 'mean': [Mean()] }, 'bob': { 'mean': [Mean()] }, }

global_history

Examples: >>> { 'loss': [0.46011224], 'accuracy': [0.8639647] }

global_detailed_history

Examples: >>> { 'loss': [Loss(name='loss')], 'precision': [Precision(name='precision')], }

Methods:

__init__([local_history, ...])

record_local_history(party, metrics[, stage])

record_global_history(metrics[, stage])

local_history: Dict[str, Dict[str, List[float]]]#

Examples: >>> {

‘alice’: {‘loss’: [0.46011224], ‘accuracy’: [0.8639647]}, ‘bob’: {‘loss’: [0.46011224], ‘accuracy’: [0.8639647]},

}

local_detailed_history: Dict[str, Dict[str, List[Metric]]]#

Examples: >>> {

‘alice’: {

‘mean’: [Mean()]

}, ‘bob’: {

‘mean’: [Mean()]

},

}

global_history: Dict[str, List[float]]#

Examples: >>> {

‘loss’: [0.46011224], ‘accuracy’: [0.8639647]

}

global_detailed_history: Dict[str, List[Metric]]#

Examples: >>> {

‘loss’: [Loss(name=’loss’)], ‘precision’: [Precision(name=’precision’)],

}

__init__(local_history: ~typing.Dict[str, ~typing.Dict[str, ~typing.List[float]]] = <factory>, local_detailed_history: ~typing.Dict[str, ~typing.Dict[str, ~typing.List[~secretflow.ml.nn.metrics.Metric]]] = <factory>, global_history: ~typing.Dict[str, ~typing.List[float]] = <factory>, global_detailed_history: ~typing.Dict[str, ~typing.List[~secretflow.ml.nn.metrics.Metric]] = <factory>) None#
record_local_history(party, metrics: List[Metric], stage='train')[源代码]#
record_global_history(metrics: List[Metric], stage='train')[源代码]#
secretflow.ml.nn.fl.utils.metric_wrapper(func, *args, **kwargs)[源代码]#
secretflow.ml.nn.fl.utils.optim_wrapper(func, *args, **kwargs)[源代码]#