secretflow.device#
secretflow.device.driver#
Functions:
|
Set up a wrapper for scheduling function to this device. |
|
Device object conversion. |
|
Get plaintext data from device. |
|
Wait for device objects until all are ready or error occurrency. |
|
Connect to an existing Ray cluster or start one and connect to it. |
|
Disconnect the worker, and terminate processes started by secretflow.init(). |
- secretflow.device.driver.with_device(dev: Device, *, num_returns: Optional[int] = None, static_argnames: Optional[Union[str, Iterable[str]]] = None)[源代码]#
Set up a wrapper for scheduling function to this device.
- 参数:
dev (Device) – Target device.
num_returns (int) – Number of returned DeviceObject.
static_argnames (Union[str, Iterable[str], None]) – See
jax.jit()
docstring.
示例
>>> p1, spu = PYU(), SPU() >>> # dynamic decorator >>> x = with_device(p1)(load_data)('alice.csv') >>> # static decorator >>> @with_device(spu) >>> def selu(x, alpha=1.67, lmbda=1.05): >>> return lmbda * jnp.where(x > 0, x, alpha * jnp.exp(x) - alpha) >>> x_ = x.to(spu) >>> y = selu(x_)
- secretflow.device.driver.to(device: Device, data: Any)[源代码]#
Device object conversion.
- 参数:
device (Device) – Target device.
data (Any) – DeviceObject or plaintext data.
- 返回:
Target device object.
- 返回类型:
- secretflow.device.driver.reveal(func_or_object, heu_encoder=None)[源代码]#
Get plaintext data from device.
NOTE: Use this function with extreme caution, as it may cause privacy leaks. In SecretFlow, we recommend that data should flow between different devices and rarely revealed to driver. Only use this function when data dependency control flow occurs.
- 参数:
func_or_object – May be callable or any Python objects which contains Device objects.
heu_encoder – Can be heu Encoder or EncoderParams. This is used to replace the default encoder from config
- secretflow.device.driver.wait(objects: Any)[源代码]#
Wait for device objects until all are ready or error occurrency.
NOTE: This function uses reveal internally, but won’t reveal result to public. So this is secure to use this as synchronization semantics.
- 参数:
objects – struct of device objects.
示例
>>> spu = sf.SPU() >>> spu_value = spu(some_function)(some_value) >>> alice_value = spu_value.to(alice) >>> # synchronization >>> sf.wait(alice(some_save_value_function_locally)(alice_value))
- secretflow.device.driver.init(parties: Optional[Union[str, List[str]]] = None, address: Optional[str] = None, cluster_config: Optional[Dict] = None, num_cpus: Optional[int] = None, log_to_driver=True, omp_num_threads: Optional[int] = None, logging_level: str = 'info', cross_silo_grpc_retry_policy: Optional[Dict] = None, cross_silo_send_max_retries: Optional[int] = None, cross_silo_messages_max_size_in_bytes: Optional[int] = None, cross_silo_serializing_allowed_list: Optional[Dict] = None, cross_silo_timeout_in_seconds: int = 3600, exit_on_failure_cross_silo_sending: bool = True, enable_waiting_for_other_parties_ready: bool = True, tls_config: Optional[Dict[str, Dict]] = None, auth_manager_config: Optional[Dict] = None, party_key_pair: Optional[Dict[str, Dict]] = None, tee_simulation: bool = False, **kwargs)[源代码]#
Connect to an existing Ray cluster or start one and connect to it.
- 参数:
parties – parties this node represents, e.g: ‘alice’, [‘alice’, ‘bob’, ‘carol’]. If parties are provided, then simulation mode will be enabled, which means a single ray cluster will simulate as multi parties. If you want to run SecretFlow in production mode, plean keep it None.
address – The address of the Ray cluster to connect to. If this address is not provided, then a local ray will be started.
cluster_config –
the cluster config of multi SecretFlow parties. Must be provided if you run SecretFlow in cluster mode. E.g.
# For alice { 'parties': { 'alice': { # The address for other parties. 'address': '127.0.0.1:10001', # (Optional) the listen address, the `address` will # be used if not prodived. 'listen_addr': '0.0.0.0:10001' }, 'bob': { # The address for other parties. 'address': '127.0.0.1:10002', # (Optional) the listen address, the `address` will # be used if not prodived. 'listen_addr': '0.0.0.0:10002' }, }, 'self_party': 'alice' } # For bob { 'parties': { 'alice': { # The address for other parties. 'address': '127.0.0.1:10001', # (Optional) the listen address, the `address` will # be used if not prodived. 'listen_addr': '0.0.0.0:10001' }, 'bob': { # The address for other parties. 'address': '127.0.0.1:10002', # (Optional) the listen address, the `address` will # be used if not prodived. 'listen_addr': '0.0.0.0:10002' }, }, 'self_party': 'bob' }
num_cpus – Number of CPUs the user wishes to assign to each raylet.
log_to_driver – Whether direct output of worker processes on all nodes to driver.
omp_num_threads – set environment variable OMP_NUM_THREADS. It works only when address is None.
logging_level – optional; works only in production mode. the logging level, could be debug, info, warning, error, critical, not case sensititive.
cross_silo_grpc_retry_policy –
optional, works only in production mode. a dict descibes the retry policy for cross silo rpc call. If None, the following default retry policy will be used. More details please refer to retry-policy.
{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMultiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] }
cross_silo_send_max_retries – optional, works only in production mode. the max retries for sending data cross silo.
cross_silo_messages_max_size_in_bytes – int, works only in production mode. the max number of byte for one transaction. The size must be strictly less than 2GB, i.e. 2 * (1024 ** 3).
cross_silo_serializing_allowed_list –
optional, works only in production mode. A dict describes the package or class list allowed for cross-silo serializing(deserializating). It’s used for avoiding pickle deserializing execution attack when crossing silos. E.g.
{ "numpy.core.numeric": ["*"], "numpy": ["dtype"], }
cross_silo_timeout_in_seconds – The timeout in seconds of a cross-silo RPC call. It’s 3600 by default.
exit_on_failure_cross_silo_sending – optional, works only in production mode. whether exit when failure on cross-silo sending. If True, a SIGTERM will be signaled to self if failed to sending cross-silo data.
enable_waiting_for_other_parties_ready – wait for other parties ready if True.
tls_config –
optional, a dict describes the tls certificate and key infomations. E.g.
{ 'key': 'server key in pem.' 'cert': 'server certificate in pem.', 'ca_cert': 'root ca certificate of other parties.' }
auth_manager_config –
optional, a dict describes the config about authority manager service. Authority manager helps manage the authority of TEE data. This parameter is for TEE users only. An example,
{ 'host': 'host of authority manager service.' 'mr_enclave': 'mr_enclave of authority manager.', 'ca_cert': 'optional, root ca certificate of authority manager.' }
party_key_pair –
optional, a dict describes the asymmetric key pair. This is required for party who wants to send data to TEEU. E.g.
# For alice { 'alice': { 'public_key': 'RSA public key of alice in pem.', 'private_key': 'RSA private key of alice in pem.', } } # For bob { 'bob': { 'public_key': 'RSA public key of bob in pem.', 'private_key': 'RSA private key of bob in pem.', } }
tee_simulation – optional, enable TEE simulation if True. When simulation is enabled, the remote attestation for auth manager will be ignored. This is for test only and keep it False when for production.
**kwargs – see
ray.init()
parameters.
- secretflow.device.driver.shutdown()[源代码]#
Disconnect the worker, and terminate processes started by secretflow.init().
This will automatically run at the end when a Python process that uses Ray exits. It is ok to run this twice in a row. The primary use case for this function is to cleanup state between tests.
secretflow.device.global_state#
Functions:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Classes:
|
|
|
- class secretflow.device.global_state.PartyCert(party_name: str = None, key: str = None, cert: str = None, root_ca_cert: str = None)[源代码]#
基类:
object
Attributes:
Server key in pem.
Server cert in pem.
Root CA certifcate of clients.
Methods:
__init__
([party_name, key, cert, root_ca_cert])- party_name: str = None#
- key: str = None#
Server key in pem.
- cert: str = None#
Server cert in pem.
- root_ca_cert: str = None#
Root CA certifcate of clients.
- __init__(party_name: Optional[str] = None, key: Optional[str] = None, cert: Optional[str] = None, root_ca_cert: Optional[str] = None) None #
- class secretflow.device.global_state.PartyKeyPair(party_name: str = None, public_key: str = None, private_key: str = None)[源代码]#
基类:
object
Attributes:
The RSA public key in pem.
The RSA private key in pem.
Methods:
__init__
([party_name, public_key, private_key])- party_name: str = None#
- public_key: str = None#
The RSA public key in pem.
- private_key: str = None#
The RSA private key in pem.
- __init__(party_name: Optional[str] = None, public_key: Optional[str] = None, private_key: Optional[str] = None) None #
- secretflow.device.global_state.set_party_key_pairs(party_key_pairs: Dict[str, PartyKeyPair])[源代码]#
- secretflow.device.global_state.party_key_pairs() Dict[str, PartyKeyPair] [源代码]#
secretflow.device.link#
Functions:
|
|
|
|
|
Send message to the target device. |
|
Send message to the target device. |
|
Receive messages from the source device. |
|
Receive messages from the source device. |
|
Classes:
|
|
|
A helper class for communication inside actor between several actors. |
- secretflow.device.link.send_to_clients(name, value, version)[源代码]#
- Send message to the target device.
this function is non-blocking.
- 参数:
name – message name
value – message value
version – message version, used to distinguish between different training rounds
- secretflow.device.link.send_to_server(name, value, version)[源代码]#
- Send message to the target device.
this function is non-blocking.
- 参数:
name – message name
value – message value
version – message version, used to distinguish between different training rounds
- secretflow.device.link.recv_from_clients(name, version)[源代码]#
- Receive messages from the source device.
this function is blocking
- 参数:
name – message name
version – TODO: What is the purpose of the version parameter?
- 返回:
The received message
- secretflow.device.link.recv_from_server(name, version)[源代码]#
- Receive messages from the source device.
this function is blocking
- 参数:
name – message name
version – message version, used to distinguish between different training rounds
- 返回:
The received message
- class secretflow.device.link.FedCommunicator(partners: List[PYU])[源代码]#
基类:
Communicator
Methods:
__init__
(partners)send
(dest, data, key)recv
(src, keys)
- class secretflow.device.link.RayCommunicator[源代码]#
基类:
Communicator
Methods:
__init__
()links
(links)send
(dest, data, key)recv
(src, keys)
- class secretflow.device.link.Link(device: PYU, key_prefix: str = '')[源代码]#
基类:
object
A helper class for communication inside actor between several actors.
You should not use this class directly but inherit it and decorate your child class with
proxy()
.示例
>>> from secretflow.device import proxy >>> from seceretflow.device.link import Link, init_link >>> >>> @proxy >>> class PS(Link): >>> def run(self): >>> pass >>> >>> >>> @proxy >>> class Client(Link): >>> def run(self): >>> pass >>> >>> ps = PS() >>> clients = [Client() for i in range(2)] >>> init_link(ps, clients) >>> for client in clients: >>> init_link(client, ps) >>>
Methods:
__init__
(device[, key_prefix])Initialize
initialize
(comm_or_links)send
(name, value, dst_device[, step_id])Send message to target device.
recv
(name, src_device[, step_id])Receive messages from the source device.
Attributes:
- __init__(device: PYU, key_prefix: str = '')[源代码]#
Initialize
- 参数:
device – where this Link instance located, PYU
- initialize(comm_or_links: Union[Communicator, Dict[PYU, ActorHandle]])[源代码]#
- property clients#
- property server#
- send(name: str, value: Any, dst_device: Union[PYU, List[PYU]], step_id: int = 0)[源代码]#
- Send message to target device.
this function is non-blocking
- 参数:
name – message name
value – message value
dst_device – target device(s), can be a single device or a list of devices
step_id – A process-level unique identifier to identify the communication
- recv(name: str, src_device: Union[PYU, List[PYU]], step_id: int = 0) Any [源代码]#
- Receive messages from the source device.
this function is blocking
- 参数:
name – The message name
src_device – source device(s), can be a single device or a list of devices
step_id – A process-level unique identifier to identify the communication
- 返回:
The received message
secretflow.device.proxy#
Functions:
|
Define a device class which should accept DeviceObject as method parameters and return DeviceObject. |
- secretflow.device.proxy.proxy(device_object_type: Type[DeviceObject], max_concurrency: Optional[int] = None, _simulation_max_concurrency: Optional[int] = None)[源代码]#
Define a device class which should accept DeviceObject as method parameters and return DeviceObject.
This proxy function mainly does the following work: 1. Add an additional parameter device: Device to init method __init__. 2. Wrap class methods, allow passing DeviceObject as parameters, which must be on the same device as the class instance. 3. According to the return annotation of class methods, return the corresponding number of DeviceObject.
@proxy(PYUObject) class Model: def __init__(self, builder): self.weights = builder() def build_dataset(self, x, y): self.dataset_x = x self.dataset_y = y def get_weights(self) -> np.ndarray: return self.weights def train_step(self, step) -> Tuple[np.ndarray, int]: return self.weights, 100 alice = PYU('alice') model = Model(builder, device=alice) x, y = alice(load_data)() model.build_dataset(x, y) w = model.get_weights() w, n = model.train_step(10)
- 参数:
device_object_type (Type[DeviceObject]) – DeviceObject type, eg. PYUObject.
max_concurrency (int) – Actor threadpool size.
_simulation_max_concurrencty (int) – Actor threadpool size only for simulation (single controller mode). This argument takes effect only when max_concurrency is None.
- 返回:
Wrapper function.
- 返回类型:
Callable