线性回归模型#
线性回归模型是形式简单且使用非常广泛的一类统计模型, 隐语在多方安全计算协议的保护下, 通过 minibatch梯度下降方法 实现了可证安全的线性回归和二分类回归。
minibatch梯度下降的矩阵化公式如下:
常规:
\({\theta^{t+1}} \leftarrow {\theta^t} - \frac{\alpha}{m} {X}^T ({X}{\theta^t} - {y})\)
L2正则:
\({\theta^{t+1}} \leftarrow {\theta^t} - \frac{\alpha}{m} ({X}^T ({X}{\theta^t} - {y}) + \lambda {w^t})\) 其中 \(w^t_0 = 0, w^t_j = \theta^t_j\) \(j = 1, \cdots, n\)
隐语提供了 SS-SGD/HESS-SGD 两种可证安全实现:
SS-SGD: SS-SGD 是 secret sharing SGD training 的缩写,使用秘密分享协议计算下降梯度。
HESS-SGD: HESS-SGD 是 HE & secret sharing SGD training 的缩写,使用同态加密计算下降梯度。
Secret Sharing对带宽和延迟比较敏感,而同态加密方案会消耗更多的cpu算力。
局域网/万兆环境下SS能更快的完成建模,带宽受限且延迟较高的网络环境可以用HE提高建模速度。
两种实现方式在求梯度之外的运算逻辑和算法的安全设定基本一致,根据用户环境的cpu/带宽不同,选择合适的实现使用即可。
SS-SGD#
SSRegression()
使用秘密分享协议实现了针对垂直划分数据集的线性回归和二分类回归训练,求解方式为批量随机梯度下降。
线性回归 拟合系数为 w = (w1, …, wp) 的线性模型,目标为最小化数据集中标签值与线性近似的预测值之间的残差平方和。
逻辑回归 是以分类为目标的线性模型。逻辑回归在文献中也称为 logit 回归、最大熵分类 (MaxEnt) 或对数线性分类器。通过逻辑函数将线性预测结果转换为样本的概率结果。该方法同时提供可选的L2 正则化选项来防止过拟合。
用例#
在本示例中使用单节点模式做示范。集群模式的部署方式: Deployment
API详情:SSRegression()
import sys
import time
import logging
import numpy as np
import spu
import secretflow as sf
from secretflow.data.split import train_test_split
from secretflow.device.driver import wait, reveal
from secretflow.data import FedNdarray, PartitionWay
from secretflow.ml.linear.ss_sgd import SSRegression
from sklearn.metrics import roc_auc_score, accuracy_score, classification_report
# init log
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# init all nodes in local Standalone Mode.
sf.init(['alice', 'bob', 'carol'], address='local')
# init PYU, the Python Processing Unit, process plaintext in each node.
alice = sf.PYU('alice')
bob = sf.PYU('bob')
carol = sf.PYU('carol')
# init SPU, the Secure Processing Unit,
# process ciphertext under the protection of a multi-party secure computing protocol
spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob', 'carol']))
# read data in each party
def read_x(start, end):
# use breast_cancer as example
from sklearn.datasets import load_breast_cancer
from sklearn.preprocessing import StandardScaler
x = load_breast_cancer()['data']
# LR's train dataset must be standardized or normalized
scaler = StandardScaler()
x = scaler.fit_transform(x)
return x[:, start:end]
def read_y():
from sklearn.datasets import load_breast_cancer
return load_breast_cancer()['target']
# alice / bob / carol each hold one third of the features of the data
# read_x is execute locally on each node.
v_data = FedNdarray(
partitions={
alice: alice(read_x)(0, 10),
bob: bob(read_x)(10, 20),
carol: carol(read_x)(20, 30),
},
partition_way=PartitionWay.VERTICAL,
)
# Y label belongs to alice
label_data = FedNdarray(
partitions={alice: alice(read_y)()},
partition_way=PartitionWay.VERTICAL,
)
# wait IO finished
wait([p.data for p in v_data.partitions.values()])
wait([p.data for p in label_data.partitions.values()])
# split train data and test date
random_state = 1234
split_factor = 0.8
v_train_data, v_test_data = train_test_split(v_data, train_size=split_factor, random_state=random_state)
v_train_label, v_test_label = train_test_split(label_data, train_size=split_factor, random_state=random_state)
# run SS-SGD
# SSRegression use spu to fit model.
model = SSRegression(spu)
start = time.time()
model.fit(
v_train_data, # x
v_train_label, # y
5, # epochs
0.3, # learning_rate
32, # batch_size
't1', # sig_type
'logistic', # reg_type
'l2', # penalty
0.1, # l2_norm
)
logging.info(f"train time: {time.time() - start}")
# Do predict
start = time.time()
# Now the result is saved in the spu by ciphertext
spu_yhat = model.predict(v_test_data)
# reveal for auc, acc and classification report test.
yhat = reveal(spu_yhat)
logging.info(f"predict time: {time.time() - start}")
y = reveal(v_test_label.partitions[alice])
# get the area under curve(auc) score of classification
logging.info(f"auc: {roc_auc_score(y, yhat)}")
binary_class_results = np.where(yhat > 0.5, 1, 0)
# get the accuracy score of classification
logging.info(f"acc: {accuracy_score(y, binary_class_results)}")
# get the report of classification
print("classification report:")
print(classification_report(y, binary_class_results))
算法实现#
逻辑回归详细流程:
以二分类为例,主要流程如下
Step 1: 初始化数据集
数据提供方分别将数据集Secret Share进入密态,并在密态下concatenate为X。
Y数据持有方将Y Secret Share进入密态.
在Secret Sharing下初始化w为设定的初始值。
数据集要求 X.rows > X.cols:1、样本数过少模型不收敛;2、Y可能会泄漏。
Step 2:采用mini-batch梯度下降,重复执行如下步骤,直至到达目标迭代次数
Step 2.1:计算预测值 pred = sigmoid(batch_x * w)。sigmoid可使用泰勒展开、分段函数、根号逆S形函数等近似。
Step 2.2:计算err = pred - y
Step 2.3:计算梯度 grad = batch_x.transpose() * err
Step 2.4:如果使用 L2 penalty,更新梯度 grad = grad + w’ * l2_norm,其中w’的截距项为0
Step 2.5:迭代权重 w = w - (grad * learning_rate / batch_size)
Step 3:输出模型,此时w处在Secret Sharing状态。根据需要可以将reveal (w)为明文输出或直接保存密态分片。
安全性分析#
参与计算的X/Y/W从一开始就保持在Secret Sharing状态进行联合运算,并且在运算过程中不存在reveal操作, 所以通过计算中交互的数据是无法推断出明文数据的信息。
HESS-SGD#
HESS-SGD模块 HESSLogisticRegression()
混合使用同态加密和Secret Sharing实现了可证安全的线性回归.
和SS-SGD最大的区别在于:将SS-SGD中通讯开销最大的梯度计算替换为纯本地的同态计算实现。由于同态加密的非对称特性,目前HESS-SGD只支持2方计算。算法实现参考 <When Homomorphic Encryption Marries Secret Sharing: Secure Large-Scale Sparse Logistic Regression and Applications in Risk Control>,并进行一些工程优化改造。
用例#
在本示例中使用单节点模式做示范。集群模式的部署方式: Deployment
API详情 HESSLogisticRegression()
import sys
import time
import logging
import numpy as np
import secretflow as sf
from secretflow.data.split import train_test_split
from secretflow.device.driver import wait, reveal
from secretflow.data import FedNdarray, PartitionWay
from secretflow.ml.linear.hess_sgd import HESSLogisticRegression
from sklearn.metrics import roc_auc_score, accuracy_score, classification_report
# init log
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# init all nodes in local Standalone Mode. HESS-SGD only support 2PC
sf.init(['alice', 'bob'], address='local')
# init PYU, the Python Processing Unit, process plaintext in each node.
alice = sf.PYU('alice')
bob = sf.PYU('bob')
# init SPU, the Secure Processing Unit,
# process ciphertext under the protection of a multi-party secure computing protocol
spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))
# first, init a HEU device that alice is sk_keeper and bob is evaluator
heu_config = sf.utils.testing.heu_config(sk_keeper='alice', evaluators=['bob'])
heu_x = sf.HEU(heu_config, spu.cluster_def['runtime_config']['field'])
# then, init a HEU device that bob is sk_keeper and alice is evaluator
heu_config = sf.utils.testing.heu_config(sk_keeper='bob', evaluators=['alice'])
heu_y = sf.HEU(heu_config, spu.cluster_def['runtime_config']['field'])
# read data in each party
def read_x(start, end):
# use breast_cancer as example
from sklearn.datasets import load_breast_cancer
from sklearn.preprocessing import StandardScaler
x = load_breast_cancer()['data']
# LR's train dataset must be standardized or normalized
scaler = StandardScaler()
x = scaler.fit_transform(x)
return x[:, start:end]
def read_y():
from sklearn.datasets import load_breast_cancer
return load_breast_cancer()['target']
# alice / bob each hold half of the features of the data
# read_x is execute locally on each node.
v_data = FedNdarray(
partitions={
alice: alice(read_x)(0, 15),
bob: bob(read_x)(15, 30),
},
partition_way=PartitionWay.VERTICAL,
)
# Y label belongs to bob
label_data = FedNdarray(
partitions={alice: alice(read_y)()},
partition_way=PartitionWay.VERTICAL,
)
# wait IO finished
wait([p.data for p in v_data.partitions.values()])
wait([p.data for p in label_data.partitions.values()])
# split train data and test date
random_state = 1234
split_factor = 0.8
v_train_data, v_test_data = train_test_split(v_data, train_size=split_factor, random_state=random_state)
v_train_label, v_test_label = train_test_split(label_data, train_size=split_factor, random_state=random_state)
# run HESS-SGD
# HESSLogisticRegression use spu / heu to fit model.
model = HESSLogisticRegression(spu, heu_y, heu_x)
# HESSLogisticRegression(spu, heu_x, heu_y)
# spu – SPU SPU device.
# heu_x – HEU HEU device without label.
# heu_y – HEU HEU device with label.
# Here, label belong to Alice(heu_x)
start = time.time()
model.fit(
v_train_data,
v_train_label,
learning_rate=0.3,
epochs=5,
batch_size=32,
)
logging.info(f"train time: {time.time() - start}")
# Do predict
start = time.time()
# Now the result is saved in the spu by ciphertext
spu_yhat = model.predict(v_test_data)
# reveal for auc, acc and classification report test.
yhat = reveal(spu_yhat)
logging.info(f"predict time: {time.time() - start}")
y = reveal(v_test_label.partitions[alice])
# get the area under curve(auc) score of classification
logging.info(f"auc: {roc_auc_score(y, yhat)}")
binary_class_results = np.where(yhat > 0.5, 1, 0)
# get the accuracy score of classification
logging.info(f"acc: {accuracy_score(y, binary_class_results)}")
# get the report of classification
print("classification report:")
print(classification_report(y, binary_class_results))
算法详情#
主要流程如下:
Step 1: 初始化数据集
数据集要求 X.rows > X.cols:1、样本数过少模型不收敛;2、Y可能会泄漏。
必须由bob持有Y
初始化w1 / w2,分别为 Alice / Bob 持有特征对应的权重。
使用Bob的pk加密w1 -> hw1,密文保存在Alice。使用Alice的pk加密w2 -> hw2,密文保存在Bob。
Step 2:采用mini-batch梯度下降,重复执行如下步骤,直至到达目标迭代次数
Alice / Bob 分别读取当前batch的 x1 / x2,y
使用Bob的pk加密x1 -> hx1,密文保存在Alice。使用Alice的pk加密x2 -> hx2,密文保存在Bob。
Bob通过Secret Sharing共享 <y>
Alice 本地计算部分预测值 hp1 = hx1 * hw1,Bob本地计算部分预测值 hp2 = hx2 * hw2。
使用H2S操作将同态加密的预测值进行转换为secret sharing:H2S(hp1) -> <p1>,H2S(hp2) -> <p2>
计算 <error>=Sigmoid(<p1> + <p2>) - <y>,Sigmoid 函数使用 y = 0.5 + 0.125 * x 近似
使用Bob的pk进行S2H(<error>) -> he1,密文保存在Alice。使用Alice的pk进行S2H(<error>) -> he2,密文保存在Bob。
Alice本地计算hw1 = hw1 - he1 * hx1 * learning_rate,Bob本地计算hw2 = hw2 - he2 * hx2 * learning_rate。
Step 3:输出模型
使用H2S操作将hw1,hw2转换为secret sharing:H2S(hw1) -> <w1> ,H2S(hw2) -> <w2>
<w> = concatenate(<w1>,<w2>)
安全性分析#
首先分析在计算过程中的数据交互,是否存在明文信息泄漏。在计算的过程存在2类数据交互:
-> 标记的HE加解密过程和H2S/S2H密态转换:
HE加解密过程安全性完全依赖算法本身。
H2S创建分片时,会先在密文mark随机数再解密,没有泄漏明文信息。
S2H会先加密一方的分片,然后在密文上reduce其他分片,没有泄漏明文信息。
Secret Sharing密态下的分享和联合计算过程中存在的交互。这些过程的安全性依赖于使用的mpc协议,以默认的ABY3协议为例,在SPU节点无共谋的情况下,可以保证通过分析节点间交互的数据,无法返推出任何明文信息。
最终输出的结果保存在Secret Sharing密态下,在reveal前无法反推出任何w相关信息。