secretflow.utils.simulation.datasets 源代码

# Copyright 2022 Ant Group Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import hashlib
import os
import pickle
import random
import zipfile
from collections import namedtuple
from pathlib import Path
from typing import Dict, List, Tuple, Union

import networkx as nx
import numpy as np
import pandas as pd
import requests
import scipy

from secretflow.data.horizontal import HDataFrame
from secretflow.data.ndarray import FedNdarray, PartitionWay
from secretflow.data.vertical import VDataFrame
from secretflow.data.vertical import read_csv as v_read_csv
from secretflow.device.device.pyu import PYU
from secretflow.security.aggregation import Aggregator
from secretflow.security.compare import Comparator
from secretflow.utils.hash import sha256sum
from secretflow.utils.simulation.data.dataframe import create_df, create_vdf
from secretflow.utils.simulation.data.ndarray import create_ndarray

_CACHE_DIR = os.path.join(os.path.expanduser('~'), '.secretflow/datasets')

_Dataset = namedtuple('_Dataset', ['filename', 'url', 'sha256'])

_DATASETS = {
    'iris': _Dataset(
        'iris.csv',
        'https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/iris/iris.csv',
        '92cae857cae978e0c25156265facc2300806cf37eb8700be094228b374f5188c',
    ),
    'dermatology': _Dataset(
        'dermatology.csv',
        'https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/dermatology/dermatology.csv',
        '76b63f6c2be12347b1b76f485c6e775e36d0ab5412bdff0e9df5a9885f5ae11e',
    ),
    'bank_marketing': _Dataset(
        'bank.csv',
        'https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/bank_marketing/bank.csv',
        'dc8d576e9bda0f41ee891251bd84bab9a39ce576cba715aac08adc2374a01fde',
    ),
    'mnist': _Dataset(
        'mnist.npz',
        'https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/mnist/mnist.npz',
        '731c5ac602752760c8e48fbffcf8c3b850d9dc2a2aedcf2cc48468fc17b673d1',
    ),
    'linear': _Dataset(
        'linear.csv',
        'https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/linear/linear.csv',
        'bf269b267eb9e6985ae82467a4e1ece420de90f3107633cb9b9aeda6632c0052',
    ),
    'cora': _Dataset(
        'cora.zip',
        'https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/cora/cora.zip',
        'd7018f2d7d2b693abff6f6f7ccaf9d70e2e428ca068830863f19a37d8575fd01',
    ),
    'bank_marketing_full': _Dataset(
        'bank-full.csv',
        'https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/bank_marketing/bank-full.csv',
        'd1513ec63b385506f7cfce9f2c5caa9fe99e7ba4e8c3fa264b3aaf0f849ed32d',
    ),
    'ml-1m': _Dataset(
        'ml-1m.zip',
        'https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/movielens/ml-1m.zip',
        'a6898adb50b9ca05aa231689da44c217cb524e7ebd39d264c56e2832f2c54e20',
    ),
    'pubmed': _Dataset(
        'pubmed.zip',
        'https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/pubmed/pubmed.zip',
        '04a5aa8b3b3432d617d35286e42011b64d58ac362a107d2c257d9da85bf0c021',
    ),
    'citeseer': _Dataset(
        'citeseer.zip',
        'https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/citeseer/citeseer.zip',
        '8f0f1aba42c7be5818dc43d96913713a2ffc1c0d9dc09bef30d0432d2c102b49',
    ),
}


def _unzip(file, extract_path=None):
    if not extract_path:
        extract_path = str(Path(file).parent)
    with zipfile.ZipFile(file, 'r') as zip_f:
        zip_f.extractall(extract_path)


def _download(url: str, filepath: str, sha256: str):
    Path(filepath).parent.mkdir(parents=True, exist_ok=True)
    content = requests.get(url, stream=True).content
    h = hashlib.sha256()
    h.update(content)
    actual_sha256 = h.hexdigest()
    assert (
        sha256 == actual_sha256
    ), f'Failed to check sha256 of {url}, expected {sha256}, got {actual_sha256}.'

    with open(filepath, 'wb') as f:
        f.write(content)


def _get_dataset(dataset: _Dataset, cache_dir: str = None):
    if not cache_dir:
        cache_dir = _CACHE_DIR

    filepath = f'{cache_dir}/{dataset.filename}'
    Path(filepath).parent.mkdir(parents=True, exist_ok=True)

    import filelock

    with filelock.FileLock(f'{filepath}.lock'):
        need_download = not Path(filepath).exists()
        if not need_download:
            sha256 = sha256sum(filepath)
            if sha256 != dataset.sha256:
                os.remove(filepath)
                need_download = True

        if need_download:
            assert (
                dataset.url
            ), f'{dataset.filename} does not exist locally, please give a download url.'

            _download(dataset.url, filepath, dataset.sha256)
        return filepath


[文档]def dataset(name: str, cache_dir: str = None) -> str: """Get the specific dataset file path. Args: name: the dataset name, should be one of ['iris', 'dermatology', 'bank_marketing', 'mnist', 'linear']. Returns: the dataset file path. """ assert name and isinstance(name, str), 'Name shall be a valid string.' name = name.lower() return _get_dataset(_DATASETS[name], cache_dir)
[文档]def load_iris( parts: Union[List[PYU], Dict[PYU, Union[float, Tuple]]], axis=0, aggregator: Aggregator = None, comparator: Comparator = None, ) -> Union[HDataFrame, VDataFrame]: """Load iris dataset to federated dataframe. This dataset includes columns: 1. sepal_length 2. sepal_width 3. petal_length 4. petal_width 5. class This dataset originated from `Iris <https://archive.ics.uci.edu/ml/datasets/iris>`_. Args: parts: the data partitions. The dataset will be distributed as evenly as possible to each PYU if parts is a array of PYUs. If parts is a dict {PYU: value}, the value shall be one of the followings. 1) a float 2) an interval in tuple closed on the left-side and open on the right-side. axis: optional; optional, the value is 0 or 1. 0 means split by row and returns a horizontal partitioning federated DataFrame. 1 means split by column returns a vertical partitioning federated DataFrame. aggregator: optional, shall be provided only when axis is 0. For details, please refer to `secretflow.data.horizontal.HDataFrame`. comparator: optional, shall be provided only when axis is 0. For details, please refer to `secretflow.data.horizontal.HDataFrame`. Returns: return a HDataFrame if axis is 0 else VDataFrame. """ filepath = _get_dataset(_DATASETS['iris']) return create_df( source=filepath, parts=parts, axis=axis, shuffle=False, aggregator=aggregator, comparator=comparator, )
[文档]def load_dermatology( parts: Union[List[PYU], Dict[PYU, Union[float, Tuple]]], axis=0, class_starts_from_zero: bool = True, aggregator: Aggregator = None, comparator: Comparator = None, ) -> Union[HDataFrame, VDataFrame]: """Load dermatology dataset to federated dataframe. This dataset consists of dermatology cancer diagnosis. For the original dataset please refer to `Dermatology <https://archive.ics.uci.edu/ml/datasets/dermatology>`_. Args: parts: the data partitions. The dataset will be distributed as evenly as possible to each PYU if parts is a array of PYUs. If parts is a dict {PYU: value}, the value shall be one of the followings. 1) a float 2) an interval in tuple closed on the left-side and open on the right-side. axis: optional, the value could be 0 or 1. 0 means split by row and returns a horizontal partitioning federated DataFrame. 1 means split by column returns a vertical partitioning federated DataFrame. class_starts_from_zero: optional, class starts from zero if True. aggregator: optional, shall be provided only when axis is 0. For details, please refer to `secretflow.data.horizontal.HDataFrame`. comparator: optional, shall be provided only when axis is 0. For details, please refer to `secretflow.data.horizontal.HDataFrame`. Returns: return a HDataFrame if axis is 0 else VDataFrame. """ filepath = _get_dataset(_DATASETS['dermatology']) df = pd.read_csv(filepath) if class_starts_from_zero: df['class'] = df['class'] - 1 return create_df( source=df, parts=parts, axis=axis, shuffle=False, aggregator=aggregator, comparator=comparator, )
[文档]def load_bank_marketing( parts: Union[List[PYU], Dict[PYU, Union[float, Tuple]]], axis=0, full=False, aggregator: Aggregator = None, comparator: Comparator = None, ) -> Union[HDataFrame, VDataFrame]: """Load bank marketing dataset to federated dataframe. This dataset is related with direct marketing campaigns. For the original dataset please refer to `Bank marketing <https://archive.ics.uci.edu/ml/datasets/bank+marketing>`_. Args: parts: the data partitions. The dataset will be distributed as evenly as possible to each PYU if parts is a array of PYUs. If parts is a dict {PYU: value}, the value shall be one of the followings. 1) a float 2) an interval in tuple closed on the left-side and open on the right-side. axis: optional, the value is 0 or 1. 0 means split by row and returns a horizontal partitioning federated DataFrame. 1 means split by column returns a vertical partitioning federated DataFrame. full: optional. indicates whether to load to full version of dataset. aggregator: optional, shall be provided only when axis is 0. For details, please refer to `secretflow.data.horizontal.HDataFrame`. comparator: optional, shall be provided only when axis is 0. For details, please refer to `secretflow.data.horizontal.HDataFrame`. Returns: return a HDataFrame if axis is 0 else VDataFrame. """ if full: filepath = _get_dataset(_DATASETS['bank_marketing_full']) else: filepath = _get_dataset(_DATASETS['bank_marketing']) return create_df( lambda: pd.read_csv(filepath, sep=';'), parts=parts, axis=axis, shuffle=False, aggregator=aggregator, comparator=comparator, )
[文档]def load_mnist( parts: Union[List[PYU], Dict[PYU, Union[float, Tuple]]], normalized_x: bool = True, categorical_y: bool = False, is_torch: bool = False, ) -> Tuple[Tuple[FedNdarray, FedNdarray], Tuple[FedNdarray, FedNdarray]]: """Load mnist dataset to federated ndarrays. This dataset has a training set of 60,000 examples, and a test set of 10,000 examples. Each example is a 28x28 grayscale image of the 10 digits. For the original dataset please refer to `MNIST <http://yann.lecun.com/exdb/mnist/>`_. Args: parts: the data partitions. The dataset will be distributed as evenly as possible to each PYU if parts is a array of PYUs. If parts is a dict {PYU: value}, the value shall be one of the followings. 1) a float 2) an interval in tuple closed on the left-side and open on the right-side. normalized_x: optional, normalize x if True. Default to True. categorical_y: optional, do one hot encoding to y if True. Default to True. Returns: A tuple consists of two tuples, (x_train, y_train) and (x_train, y_train). """ filepath = _get_dataset(_DATASETS['mnist']) with np.load(filepath) as f: x_train, y_train = f['x_train'], f['y_train'] x_test, y_test = f['x_test'], f['y_test'] if normalized_x: x_train, x_test = x_train / 255, x_test / 255 if categorical_y: from sklearn.preprocessing import OneHotEncoder encoder = OneHotEncoder(sparse=False) y_train = encoder.fit_transform(y_train.reshape(-1, 1)) y_test = encoder.fit_transform(y_test.reshape(-1, 1)) return ( ( create_ndarray(x_train, parts=parts, axis=0, is_torch=is_torch), create_ndarray(y_train, parts=parts, axis=0), ), ( create_ndarray(x_test, parts=parts, axis=0, is_torch=is_torch), create_ndarray(y_test, parts=parts, axis=0), ), )
[文档]def load_linear(parts: Union[List[PYU], Dict[PYU, Union[float, Tuple]]]) -> VDataFrame: """Load the linear dataset to federated dataframe. This dataset is random generated and includes columns: 1) id 2) 20 features: [x1, x2, x3, ..., x19, x20] 3) y Args: parts: the data partitions. The dataset will be distributed as evenly as possible to each PYU if parts is a array of PYUs. If parts is a dict {PYU: value}, the value shall be one of the followings. 1) a float 2) an interval in tuple closed on the left-side and open on the right-side. Returns: return a VDataFrame. """ filepath = _get_dataset(_DATASETS['linear']) return create_vdf(source=filepath, parts=parts, shuffle=False)
[文档]def load_cora( parts: List[PYU], data_dir: str = None, add_self_loop: bool = True ) -> Tuple[ FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, ]: """Load the cora dataset for split learning GNN. Args: parts (List[PYU]): parties that the paper features will be partitioned evenly. Returns: A tuple of FedNdarray: edge, x, Y_train, Y_val, Y_valid, index_train, index_val, index_test. Note that Y is bound to the first participant. """ assert parts, 'Parts shall not be None or empty!' if data_dir is None: data_dir = os.path.join(_CACHE_DIR, 'cora') if not Path(data_dir).is_dir(): filepath = _get_dataset(_DATASETS['cora']) _unzip(filepath, data_dir) file_names = [ os.path.join(data_dir, f'ind.cora.{name}') for name in ['y', 'tx', 'ty', 'allx', 'ally', 'graph'] ] objects = [] for name in file_names: with open(name, 'rb') as f: objects.append(pickle.load(f, encoding='latin1')) y, tx, ty, allx, ally, graph = tuple(objects) with open(os.path.join(data_dir, f"ind.cora.test.index"), 'r') as f: test_idx_reorder = f.readlines() test_idx_reorder = list(map(lambda s: int(s.strip()), test_idx_reorder)) test_idx_range = np.sort(test_idx_reorder) nodes = scipy.sparse.vstack((allx, tx)).tolil() nodes[test_idx_reorder, :] = nodes[test_idx_range, :] edge_sparse = nx.adjacency_matrix(nx.from_dict_of_lists(graph)) labels = np.vstack((ally, ty)) labels[test_idx_reorder, :] = labels[test_idx_range, :] idx_test = test_idx_range.tolist() idx_train = range(len(y)) idx_val = range(len(y), len(y) + 500) def sample_mask(idx, length): mask = np.zeros(length) mask[idx] = 1 return np.array(mask, dtype=np.bool) train_mask = sample_mask(idx_train, labels.shape[0]) val_mask = sample_mask(idx_val, labels.shape[0]) test_mask = sample_mask(idx_test, labels.shape[0]) y_train = np.zeros(labels.shape) y_val = np.zeros(labels.shape) y_test = np.zeros(labels.shape) y_train[train_mask, :] = labels[train_mask, :] y_val[val_mask, :] = labels[val_mask, :] y_test[test_mask, :] = labels[test_mask, :] def edge_dense(edge: np.ndarray): if add_self_loop: return edge + np.eye(edge.shape[1]) else: return edge.toarray() nodes = nodes.toarray() edge_arr = FedNdarray( partitions={part: part(edge_dense)(edge_sparse) for part in parts}, partition_way=PartitionWay.HORIZONTAL, ) feature_split_idxs = np.rint(np.linspace(0, nodes.shape[1], len(parts) + 1)).astype( np.int32 ) x_arr = FedNdarray( partitions={ part: part( lambda: nodes[:, feature_split_idxs[i] : feature_split_idxs[i + 1]] )() for i, part in enumerate(parts) }, partition_way=PartitionWay.VERTICAL, ) Y_train_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: y_train)()}, partition_way=PartitionWay.HORIZONTAL, ) Y_val_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: y_val)()}, partition_way=PartitionWay.HORIZONTAL, ) Y_test_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: y_test)()}, partition_way=PartitionWay.HORIZONTAL, ) idx_train_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: train_mask)()}, partition_way=PartitionWay.HORIZONTAL, ) idx_val_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: val_mask)()}, partition_way=PartitionWay.HORIZONTAL, ) idx_test_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: test_mask)()}, partition_way=PartitionWay.HORIZONTAL, ) return ( edge_arr, x_arr, Y_train_arr, Y_val_arr, Y_test_arr, idx_train_arr, idx_val_arr, idx_test_arr, )
[文档]def load_pubmed( parts: List[PYU], data_dir: str = None, add_self_loop: bool = True ) -> Tuple[ FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, ]: """Load the pubmed dataset for split learning GNN. Datasource: https://github.com/kimiyoung/planetoid/tree/master/data Args: parts (List[PYU]): parties that the paper features will be partitioned evenly. Returns: A tuple of FedNdarray: edge, x, Y_train, Y_val, Y_valid, index_train, index_val, index_test. Note that Y is bound to the first participant. """ assert parts, 'Parts shall not be None or empty!' if data_dir is None: data_dir = os.path.join(_CACHE_DIR, 'pubmed') if not Path(data_dir).is_dir(): filepath = _get_dataset(_DATASETS['pubmed']) _unzip(filepath, data_dir) file_names = [ os.path.join(data_dir, f'ind.pubmed.{name}') for name in ['y', 'tx', 'ty', 'allx', 'ally', 'graph'] ] objects = [] for name in file_names: with open(name, 'rb') as f: objects.append(pickle.load(f, encoding='latin1')) y, tx, ty, allx, ally, graph = tuple(objects) with open(os.path.join(data_dir, f"ind.pubmed.test.index"), 'r') as f: test_idx_reorder = f.readlines() test_idx_reorder = list(map(lambda s: int(s.strip()), test_idx_reorder)) test_idx_range = np.sort(test_idx_reorder) nodes = scipy.sparse.vstack((allx, tx)).tolil() nodes[test_idx_reorder, :] = nodes[test_idx_range, :] edge_sparse = nx.adjacency_matrix(nx.from_dict_of_lists(graph)) labels = np.vstack((ally, ty)) labels[test_idx_reorder, :] = labels[test_idx_range, :] # test 1000 # train #class * 20 = 7 * 20 = 140 # val 500 idx_test = test_idx_range.tolist() idx_train = range(len(y)) idx_val = range(len(y), len(y) + 500) def sample_mask(idx, length): mask = np.zeros(length) mask[idx] = 1 return np.array(mask, dtype=np.bool) train_mask = sample_mask(idx_train, labels.shape[0]) val_mask = sample_mask(idx_val, labels.shape[0]) test_mask = sample_mask(idx_test, labels.shape[0]) y_train = np.zeros(labels.shape) y_val = np.zeros(labels.shape) y_test = np.zeros(labels.shape) y_train[train_mask, :] = labels[train_mask, :] y_val[val_mask, :] = labels[val_mask, :] y_test[test_mask, :] = labels[test_mask, :] def edge_dense(edge: np.ndarray): if add_self_loop: return edge + np.eye(edge.shape[1]) else: return edge.toarray() nodes = nodes.toarray() edge_arr = FedNdarray( partitions={part: part(edge_dense)(edge_sparse) for part in parts}, partition_way=PartitionWay.HORIZONTAL, ) feature_split_idxs = np.rint(np.linspace(0, nodes.shape[1], len(parts) + 1)).astype( np.int32 ) x_arr = FedNdarray( partitions={ part: part( lambda: nodes[:, feature_split_idxs[i] : feature_split_idxs[i + 1]] )() for i, part in enumerate(parts) }, partition_way=PartitionWay.VERTICAL, ) Y_train_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: y_train)()}, partition_way=PartitionWay.HORIZONTAL, ) Y_val_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: y_val)()}, partition_way=PartitionWay.HORIZONTAL, ) Y_test_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: y_test)()}, partition_way=PartitionWay.HORIZONTAL, ) idx_train_arr = FedNdarray( partitions={part: part(lambda: train_mask)() for part in parts}, partition_way=PartitionWay.HORIZONTAL, ) idx_val_arr = FedNdarray( partitions={part: part(lambda: val_mask)() for part in parts}, partition_way=PartitionWay.HORIZONTAL, ) idx_test_arr = FedNdarray( partitions={part: part(lambda: test_mask)() for part in parts}, partition_way=PartitionWay.HORIZONTAL, ) return ( edge_arr, x_arr, Y_train_arr, Y_val_arr, Y_test_arr, idx_train_arr, idx_val_arr, idx_test_arr, )
[文档]def load_citeseer( parts: List[PYU], data_dir: str = None, add_self_loop: bool = True ) -> Tuple[ FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, FedNdarray, ]: """Load the citeseer dataset for split learning GNN. Datasource: https://github.com/kimiyoung/planetoid/tree/master/data Args: parts (List[PYU]): parties that the paper features will be partitioned evenly. Returns: A tuple of FedNdarray: edge, x, Y_train, Y_val, Y_valid, index_train, index_val, index_test. Note that Y is bound to the first participant. """ assert parts, 'Parts shall not be None or empty!' if data_dir is None: data_dir = os.path.join(_CACHE_DIR, 'citeseer') if not Path(data_dir).is_dir(): filepath = _get_dataset(_DATASETS['citeseer']) _unzip(filepath, data_dir) file_names = [ os.path.join(data_dir, f'ind.citeseer.{name}') for name in ['x', 'y', 'tx', 'ty', 'allx', 'ally', 'graph'] ] objects = [] for name in file_names: with open(name, 'rb') as f: objects.append(pickle.load(f, encoding='latin1')) x, y, tx, ty, allx, ally, graph = tuple(objects) with open(os.path.join(data_dir, f"ind.citeseer.test.index"), 'r') as f: test_idx_reorder = f.readlines() test_idx_reorder = list(map(lambda s: int(s.strip()), test_idx_reorder)) test_idx_range = np.sort(test_idx_reorder) # Fix citeseer dataset (there are some isolated nodes in the graph) # Find isolated nodes, add them as zero-vecs into the right position test_idx_range_full = range(min(test_idx_reorder), max(test_idx_reorder) + 1) zero_ind = list(set(test_idx_range_full) - set(test_idx_reorder)) tx_extended = scipy.sparse.lil_matrix((len(test_idx_range_full), x.shape[1])) tx_extended[test_idx_range - min(test_idx_range), :] = tx tx = tx_extended ty_extended = np.zeros((len(test_idx_range_full), y.shape[1])) ty_extended[test_idx_range - min(test_idx_range), :] = ty ty_extended[ zero_ind - min(test_idx_range), np.random.randint(0, y.shape[1], len(zero_ind)), ] = 1 ty = ty_extended nodes = scipy.sparse.vstack((allx, tx)).tolil() nodes[test_idx_reorder, :] = nodes[test_idx_range, :] edge_sparse = nx.adjacency_matrix(nx.from_dict_of_lists(graph)) labels = np.vstack((ally, ty)) labels[test_idx_reorder, :] = labels[test_idx_range, :] # test 1000 # train #class * 20 = 6 * 20 = 120 # val 500 idx_test = test_idx_range.tolist() idx_train = range(len(y)) idx_val = range(len(y), len(y) + 500) def sample_mask(idx, length): mask = np.zeros(length) mask[idx] = 1 return np.array(mask, dtype=np.bool) train_mask = sample_mask(idx_train, labels.shape[0]) val_mask = sample_mask(idx_val, labels.shape[0]) test_mask = sample_mask(idx_test, labels.shape[0]) y_train = np.zeros(labels.shape) y_val = np.zeros(labels.shape) y_test = np.zeros(labels.shape) y_train[train_mask, :] = labels[train_mask, :] y_val[val_mask, :] = labels[val_mask, :] y_test[test_mask, :] = labels[test_mask, :] def edge_dense(edge: np.ndarray): if add_self_loop: return edge + np.eye(edge.shape[1]) else: return edge.toarray() nodes = nodes.toarray() edge_arr = FedNdarray( partitions={part: part(edge_dense)(edge_sparse) for part in parts}, partition_way=PartitionWay.HORIZONTAL, ) feature_split_idxs = np.rint(np.linspace(0, nodes.shape[1], len(parts) + 1)).astype( np.int32 ) x_arr = FedNdarray( partitions={ part: part( lambda: nodes[:, feature_split_idxs[i] : feature_split_idxs[i + 1]] )() for i, part in enumerate(parts) }, partition_way=PartitionWay.VERTICAL, ) Y_train_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: y_train)()}, partition_way=PartitionWay.HORIZONTAL, ) Y_val_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: y_val)()}, partition_way=PartitionWay.HORIZONTAL, ) Y_test_arr = FedNdarray( partitions={parts[0]: parts[0](lambda: y_test)()}, partition_way=PartitionWay.HORIZONTAL, ) idx_train_arr = FedNdarray( partitions={part: part(lambda: train_mask)() for part in parts}, partition_way=PartitionWay.HORIZONTAL, ) idx_val_arr = FedNdarray( partitions={part: part(lambda: val_mask)() for part in parts}, partition_way=PartitionWay.HORIZONTAL, ) idx_test_arr = FedNdarray( partitions={part: part(lambda: test_mask)() for part in parts}, partition_way=PartitionWay.HORIZONTAL, ) return ( edge_arr, x_arr, Y_train_arr, Y_val_arr, Y_test_arr, idx_train_arr, idx_val_arr, idx_test_arr, )
[文档]def load_ml_1m( part: Dict[PYU, List], data_dir: str = None, shuffle: bool = False, num_sample: int = -1, ): """Load the movie lens 1M dataset for split learning. Args: parts (Dict[PYU, List]): party map features columns data_dir: data dir if data has been downloaded shuffle: whether need shuffle num_sample: num of samples, default -1 for all Returns: A tuple of FedNdarray: edge, x, Y_train, Y_val, Y_valid, index_train, index_val, index_test. Note that Y is bound to the first participant. """ def _load_data(filename, columns): data = {} with open(filename, "r", encoding="unicode_escape") as f: for line in f: ls = line.strip("\n").split("::") data[ls[0]] = dict(zip(columns[1:], ls[1:])) return data def _shuffle_data(filename): shuffled_filename = f"{filename}.shuffled" with open(filename, "r") as f: lines = f.readlines() random.shuffle(lines) with open(shuffled_filename, "w") as f: f.writelines(lines) return shuffled_filename def _parse_example(feature, columns, index): if "Title" in feature.keys(): feature["Title"] = feature["Title"].replace(",", "_") if "Genres" in feature.keys(): feature["Genres"] = feature["Genres"].replace("|", " ") values = [] values.append(str(index)) for c in columns: values.append(feature[c]) return ",".join(values) if data_dir is None: data_dir = os.path.join(_CACHE_DIR, 'ml-1m') if not Path(data_dir).is_dir(): filepath = _get_dataset(_DATASETS['ml-1m']) _unzip(filepath, data_dir) extract_dir = os.path.join(data_dir, 'ml-1m') users_data = _load_data( extract_dir + "/users.dat", columns=["UserID", "Gender", "Age", "Occupation", "Zip-code"], ) movies_data = _load_data( extract_dir + "/movies.dat", columns=["MovieID", "Title", "Genres"] ) ratings_columns = ["UserID", "MovieID", "Rating", "Timestamp"] index = 0 csv_writer_container = {} fed_csv = {} for device, columns in part.items(): file_name = os.path.join(data_dir, device.party + ".csv") fed_csv[device] = file_name _csv_writer = open(file_name, "w") csv_writer_container[device] = _csv_writer _csv_writer.write("ID," + ",".join(columns) + "\n") if shuffle: shuffled_filename = _shuffle_data(extract_dir + "/ratings.dat") f = open(shuffled_filename, "r", encoding="unicode_escape") else: f = open(extract_dir + "/ratings.dat", "r", encoding="unicode_escape") for line in f: ls = line.strip().split("::") rating = dict(zip(ratings_columns, ls)) rating.update(users_data.get(ls[0])) rating.update(movies_data.get(ls[1])) for device, columns in part.items(): parse_f = _parse_example(rating, columns, index) csv_writer_container[device].write(parse_f + "\n") index += 1 if num_sample > 0 and index >= num_sample: break for w in csv_writer_container.values(): w.close() return v_read_csv( fed_csv, keys="ID", drop_keys="ID", )