secretflow.device#

secretflow.device.driver#

Functions:

with_device(dev, *[, num_returns, ...])

Set up a wrapper for scheduling function to this device.

to(device, data)

Device object conversion.

reveal(func_or_object[, heu_encoder])

Get plaintext data from device.

wait(objects)

Wait for device objects until all are ready or error occurrency.

init([parties, address, cluster_config, ...])

Connect to an existing Ray cluster or start one and connect to it.

shutdown()

Disconnect the worker, and terminate processes started by secretflow.init().

secretflow.device.driver.with_device(dev: Device, *, num_returns: Optional[int] = None, static_argnames: Optional[Union[str, Iterable[str]]] = None)[源代码]#

Set up a wrapper for scheduling function to this device.

参数:
  • dev (Device) – Target device.

  • num_returns (int) – Number of returned DeviceObject.

  • static_argnames (Union[str, Iterable[str], None]) – See jax.jit() docstring.

示例

>>> p1, spu = PYU(), SPU()
>>> # dynamic decorator
>>> x = with_device(p1)(load_data)('alice.csv')
>>> # static decorator
>>> @with_device(spu)
>>> def selu(x, alpha=1.67, lmbda=1.05):
>>>     return lmbda * jnp.where(x > 0, x, alpha * jnp.exp(x) - alpha)
>>> x_ = x.to(spu)
>>> y = selu(x_)
secretflow.device.driver.to(device: Device, data: Any)[源代码]#

Device object conversion.

参数:
  • device (Device) – Target device.

  • data (Any) – DeviceObject or plaintext data.

返回:

Target device object.

返回类型:

DeviceObject

secretflow.device.driver.reveal(func_or_object, heu_encoder=None)[源代码]#

Get plaintext data from device.

NOTE: Use this function with extreme caution, as it may cause privacy leaks. In SecretFlow, we recommend that data should flow between different devices and rarely revealed to driver. Only use this function when data dependency control flow occurs.

参数:
  • func_or_object – May be callable or any Python objects which contains Device objects.

  • heu_encoder – Can be heu Encoder or EncoderParams. This is used to replace the default encoder from config

secretflow.device.driver.wait(objects: Any)[源代码]#

Wait for device objects until all are ready or error occurrency.

NOTE: This function uses reveal internally, but won’t reveal result to public. So this is secure to use this as synchronization semantics.

参数:

objects – struct of device objects.

示例

>>> spu = sf.SPU()
>>> spu_value = spu(some_function)(some_value)
>>> alice_value = spu_value.to(alice)
>>> # synchronization
>>> sf.wait(alice(some_save_value_function_locally)(alice_value))
secretflow.device.driver.init(parties: Optional[Union[str, List[str]]] = None, address: Optional[str] = None, cluster_config: Optional[Dict] = None, num_cpus: Optional[int] = None, log_to_driver=True, omp_num_threads: Optional[int] = None, logging_level: str = 'info', cross_silo_grpc_retry_policy: Optional[Dict] = None, cross_silo_send_max_retries: Optional[int] = None, cross_silo_messages_max_size_in_bytes: Optional[int] = None, cross_silo_serializing_allowed_list: Optional[Dict] = None, cross_silo_timeout_in_seconds: int = 3600, exit_on_failure_cross_silo_sending: bool = True, enable_waiting_for_other_parties_ready: bool = True, tls_config: Optional[Dict[str, Dict]] = None, auth_manager_config: Optional[Dict] = None, party_key_pair: Optional[Dict[str, Dict]] = None, tee_simulation: bool = False, **kwargs)[源代码]#

Connect to an existing Ray cluster or start one and connect to it.

参数:
  • parties – parties this node represents, e.g: ‘alice’, [‘alice’, ‘bob’, ‘carol’]. If parties are provided, then simulation mode will be enabled, which means a single ray cluster will simulate as multi parties. If you want to run SecretFlow in production mode, plean keep it None.

  • address – The address of the Ray cluster to connect to. If this address is not provided, then a local ray will be started.

  • cluster_config

    the cluster config of multi SecretFlow parties. Must be provided if you run SecretFlow in cluster mode. E.g.

    # For alice
    {
        'parties': {
            'alice': {
                # The address for other parties.
                'address': '127.0.0.1:10001',
                # (Optional) the listen address, the `address` will
                # be used if not prodived.
                'listen_addr': '0.0.0.0:10001'
            },
            'bob': {
                # The address for other parties.
                'address': '127.0.0.1:10002',
                # (Optional) the listen address, the `address` will
                # be used if not prodived.
                'listen_addr': '0.0.0.0:10002'
            },
        },
        'self_party': 'alice'
    }
    
    # For bob
    {
        'parties': {
            'alice': {
                # The address for other parties.
                'address': '127.0.0.1:10001',
                # (Optional) the listen address, the `address` will
                # be used if not prodived.
                'listen_addr': '0.0.0.0:10001'
            },
            'bob': {
                # The address for other parties.
                'address': '127.0.0.1:10002',
                # (Optional) the listen address, the `address` will
                # be used if not prodived.
                'listen_addr': '0.0.0.0:10002'
            },
        },
        'self_party': 'bob'
    }
    

  • num_cpus – Number of CPUs the user wishes to assign to each raylet.

  • log_to_driver – Whether direct output of worker processes on all nodes to driver.

  • omp_num_threads – set environment variable OMP_NUM_THREADS. It works only when address is None.

  • logging_level – optional; works only in production mode. the logging level, could be debug, info, warning, error, critical, not case sensititive.

  • cross_silo_grpc_retry_policy

    optional, works only in production mode. a dict descibes the retry policy for cross silo rpc call. If None, the following default retry policy will be used. More details please refer to retry-policy.

    {
        "maxAttempts": 4,
        "initialBackoff": "0.1s",
        "maxBackoff": "1s",
        "backoffMultiplier": 2,
        "retryableStatusCodes": [
            "UNAVAILABLE"
        ]
    }
    

  • cross_silo_send_max_retries – optional, works only in production mode. the max retries for sending data cross silo.

  • cross_silo_messages_max_size_in_bytes – int, works only in production mode. the max number of byte for one transaction. The size must be strictly less than 2GB, i.e. 2 * (1024 ** 3).

  • cross_silo_serializing_allowed_list

    optional, works only in production mode. A dict describes the package or class list allowed for cross-silo serializing(deserializating). It’s used for avoiding pickle deserializing execution attack when crossing silos. E.g.

    {
        "numpy.core.numeric": ["*"],
        "numpy": ["dtype"],
    }
    

  • cross_silo_timeout_in_seconds – The timeout in seconds of a cross-silo RPC call. It’s 3600 by default.

  • exit_on_failure_cross_silo_sending – optional, works only in production mode. whether exit when failure on cross-silo sending. If True, a SIGTERM will be signaled to self if failed to sending cross-silo data.

  • enable_waiting_for_other_parties_ready – wait for other parties ready if True.

  • tls_config

    optional, a dict describes the tls certificate and key infomations. E.g.

    {
        'key': 'server key in pem.'
        'cert': 'server certificate in pem.',
        'ca_cert': 'root ca certificate of other parties.'
    }
    

  • auth_manager_config

    optional, a dict describes the config about authority manager service. Authority manager helps manage the authority of TEE data. This parameter is for TEE users only. An example,

    {
        'host': 'host of authority manager service.'
        'mr_enclave': 'mr_enclave of authority manager.',
        'ca_cert': 'optional, root ca certificate of authority manager.'
    }
    

  • party_key_pair

    optional, a dict describes the asymmetric key pair. This is required for party who wants to send data to TEEU. E.g.

    # For alice
    {
        'alice': {
            'public_key': 'RSA public key of alice in pem.',
            'private_key': 'RSA private key of alice in pem.',
        }
    }
    
    # For bob
    {
        'bob': {
            'public_key': 'RSA public key of bob in pem.',
            'private_key': 'RSA private key of bob in pem.',
        }
    }
    

  • tee_simulation – optional, enable TEE simulation if True. When simulation is enabled, the remote attestation for auth manager will be ignored. This is for test only and keep it False when for production.

  • **kwargs – see ray.init() parameters.

secretflow.device.driver.shutdown()[源代码]#

Disconnect the worker, and terminate processes started by secretflow.init().

This will automatically run at the end when a Python process that uses Ray exits. It is ok to run this twice in a row. The primary use case for this function is to cleanup state between tests.

secretflow.device.global_state#

Functions:

set_self_party(self_party)

self_party()

set_party_certs(party_certs)

party_certs()

set_party_key_pairs(party_key_pairs)

party_key_pairs()

set_tee_simulation(tee_simulation)

tee_simulation()

set_auth_manager_host(auth_host)

auth_manager_host()

set_auth_manager_mr_enclave(mr_enclave)

auth_manager_mr_enclave()

set_auth_manager_ca_cert(ca_cert)

auth_manager_ca_cert()

Classes:

PartyCert([party_name, key, cert, root_ca_cert])

PartyKeyPair([party_name, public_key, ...])

secretflow.device.global_state.set_self_party(self_party: str)[源代码]#
secretflow.device.global_state.self_party() str[源代码]#
class secretflow.device.global_state.PartyCert(party_name: str = None, key: str = None, cert: str = None, root_ca_cert: str = None)[源代码]#

基类:object

Attributes:

party_name

key

Server key in pem.

cert

Server cert in pem.

root_ca_cert

Root CA certifcate of clients.

Methods:

__init__([party_name, key, cert, root_ca_cert])

party_name: str = None#
key: str = None#

Server key in pem.

cert: str = None#

Server cert in pem.

root_ca_cert: str = None#

Root CA certifcate of clients.

__init__(party_name: Optional[str] = None, key: Optional[str] = None, cert: Optional[str] = None, root_ca_cert: Optional[str] = None) None#
secretflow.device.global_state.set_party_certs(party_certs: Dict[str, PartyCert])[源代码]#
secretflow.device.global_state.party_certs() Dict[str, PartyCert][源代码]#
class secretflow.device.global_state.PartyKeyPair(party_name: str = None, public_key: str = None, private_key: str = None)[源代码]#

基类:object

Attributes:

party_name

public_key

The RSA public key in pem.

private_key

The RSA private key in pem.

Methods:

__init__([party_name, public_key, private_key])

party_name: str = None#
public_key: str = None#

The RSA public key in pem.

private_key: str = None#

The RSA private key in pem.

__init__(party_name: Optional[str] = None, public_key: Optional[str] = None, private_key: Optional[str] = None) None#
secretflow.device.global_state.set_party_key_pairs(party_key_pairs: Dict[str, PartyKeyPair])[源代码]#
secretflow.device.global_state.party_key_pairs() Dict[str, PartyKeyPair][源代码]#
secretflow.device.global_state.set_tee_simulation(tee_simulation: bool)[源代码]#
secretflow.device.global_state.tee_simulation() bool[源代码]#
secretflow.device.global_state.set_auth_manager_host(auth_host: str)[源代码]#
secretflow.device.global_state.auth_manager_host() str[源代码]#
secretflow.device.global_state.set_auth_manager_mr_enclave(mr_enclave: str)[源代码]#
secretflow.device.global_state.auth_manager_mr_enclave() str[源代码]#
secretflow.device.global_state.set_auth_manager_ca_cert(ca_cert: str)[源代码]#
secretflow.device.global_state.auth_manager_ca_cert() str[源代码]#

secretflow.device.proxy#

Functions:

proxy(device_object_type[, max_concurrency, ...])

Define a device class which should accept DeviceObject as method parameters and return DeviceObject.

secretflow.device.proxy.proxy(device_object_type: Type[DeviceObject], max_concurrency: Optional[int] = None, _simulation_max_concurrency: Optional[int] = None)[源代码]#

Define a device class which should accept DeviceObject as method parameters and return DeviceObject.

This proxy function mainly does the following work: 1. Add an additional parameter device: Device to init method __init__. 2. Wrap class methods, allow passing DeviceObject as parameters, which must be on the same device as the class instance. 3. According to the return annotation of class methods, return the corresponding number of DeviceObject.

@proxy(PYUObject)
class Model:
    def __init__(self, builder):
        self.weights = builder()

    def build_dataset(self, x, y):
        self.dataset_x = x
        self.dataset_y = y

    def get_weights(self) -> np.ndarray:
        return self.weights

    def train_step(self, step) -> Tuple[np.ndarray, int]:
        return self.weights, 100

alice = PYU('alice')
model = Model(builder, device=alice)
x, y = alice(load_data)()
model.build_dataset(x, y)
w = model.get_weights()
w, n = model.train_step(10)
参数:
  • device_object_type (Type[DeviceObject]) – DeviceObject type, eg. PYUObject.

  • max_concurrency (int) – Actor threadpool size.

  • _simulation_max_concurrencty (int) – Actor threadpool size only for simulation (single controller mode). This argument takes effect only when max_concurrency is None.

返回:

Wrapper function.

返回类型:

Callable