use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use itertools::Itertools;
use crate::proto;
use crate::base::{GroupId, IndexKey, Release, ValueProperties};
use crate::components::Mechanism;
use crate::errors::*;
use crate::utilities::{get_common_value, get_dependents, get_input_properties};
type BatchIdentifier = (u32, u32);
type PartitionIds = Vec<u32>;
fn compute_batch_privacy_usage(
privacy_usages: Vec<&proto::PrivacyUsage>
) -> Result<proto::PrivacyUsage> {
privacy_usages.into_iter().cloned().map(Ok)
.fold1(|l, r| l? + r?)
.unwrap_or_else(|| Ok(proto::PrivacyUsage {
distance: Some(proto::privacy_usage::Distance::Approximate(proto::privacy_usage::DistanceApproximate {
epsilon: 0.,
delta: 0.,
}))
}))
}
fn batch_partition<'a>(
graph: &HashMap<u32, proto::Component>,
privacy_usages: &'a HashMap<u32, Vec<proto::PrivacyUsage>>,
) -> Result<(HashMap<BatchIdentifier, Vec<&'a proto::PrivacyUsage>>, PartitionIds)> {
let mut submissions = HashMap::<u32, HashMap<u32, proto::Component>>::new();
graph.iter()
.for_each(|(node_id, component)| {
submissions
.entry(component.submission)
.or_insert_with(HashMap::new)
.insert(*node_id, component.clone());
});
let mut batches = HashMap::<BatchIdentifier, Vec<&proto::PrivacyUsage>>::new();
let mut partition_ids = Vec::new();
submissions.into_iter()
.try_for_each(|(submission_id, subgraph)| {
let parents = get_dependents(&subgraph);
let mut dependency_ids = HashMap::<u32, u32>::new();
let mut blacklist = HashSet::new();
let mut blacklist_traversal = graph.iter()
.filter(|(_, component)|
matches!(component.variant.as_ref().unwrap(), proto::component::Variant::Partition(_)))
.map(|(id, _)| {
blacklist.insert(*id);
partition_ids.push(*id);
parents.get(id).cloned().unwrap_or_else(HashSet::new)
})
.flatten()
.collect::<Vec<u32>>();
while !blacklist_traversal.is_empty() {
let node_id = blacklist_traversal.pop().unwrap();
blacklist.insert(node_id);
let component = match graph.get(&node_id) {
Some(component) => component,
None => continue
};
if let Some(proto::component::Variant::Partition(_)) = component.variant {
partition_ids.retain(|v| v != &node_id);
}
component.arguments().values()
.filter(|id| !blacklist.contains(id))
.for_each(|id| blacklist_traversal.push(*id));
if let Some(ids) = parents.get(&node_id) {
ids.iter()
.filter(|id| !blacklist.contains(id))
.for_each(|id| blacklist_traversal.push(*id));
}
}
let mut traversal = subgraph.iter()
.filter(|(id, component)|
!blacklist.contains(id) && component.arguments().values()
.filter(|id| subgraph.contains_key(id))
.count() == 0
)
.map(|(id, _)| (*id, 0))
.collect::<Vec<BatchIdentifier>>();
while !traversal.is_empty() {
let (node_id, mut dependency_id) = traversal.pop().unwrap();
if blacklist.contains(&node_id) {
continue;
}
blacklist.insert(node_id);
{
let prior_value = dependency_ids
.entry(node_id).or_insert(dependency_id);
*prior_value = *prior_value.max(&mut dependency_id);
}
if privacy_usages.contains_key(&node_id) {
dependency_id += 1;
}
if let Some(pars) = parents.get(&node_id) {
traversal.extend(pars.iter().map(|v| (*v, dependency_id)));
}
}
let mut dependency_batches = HashMap::<u32, HashSet<u32>>::new();
dependency_ids.iter()
.for_each(|(node_id, dependency_id)| {
dependency_batches.entry(*dependency_id)
.or_insert_with(HashSet::new)
.insert(*node_id);
});
dependency_batches.into_iter().for_each(|(dependency_id, batch_values)| {
batches.insert((submission_id, dependency_id), batch_values.iter()
.map(|node_id| privacy_usages.get(node_id))
.flatten().flatten()
.collect::<Vec<&'a proto::PrivacyUsage>>());
});
Ok::<_, Error>(())
})?;
Ok((batches, partition_ids))
}
pub fn compute_graph_privacy_usage(
graph: &HashMap<u32, proto::Component>,
privacy_definition: &proto::PrivacyDefinition,
properties: &HashMap<u32, ValueProperties>,
release: &Release,
) -> Result<proto::PrivacyUsage> {
let release_privacy_usages = graph.iter()
.map(|(node_id, component)| Ok((*node_id, component.get_privacy_usage(
&privacy_definition,
release.get(node_id)
.and_then(|v| v.privacy_usages.as_ref()),
&get_input_properties(component, &properties)?)?))
)
.collect::<Result<Vec<(u32, Option<Vec<proto::PrivacyUsage>>)>>>()?
.into_iter().filter_map(|(node_id, usages)| Some((node_id, usages?)))
.collect::<HashMap<u32, Vec<proto::PrivacyUsage>>>();
let dependent_edges = get_dependents(graph);
let (batches, partition_ids) = batch_partition(graph, &release_privacy_usages)?;
let zero_usage = || proto::PrivacyUsage {
distance: Some(proto::privacy_usage::Distance::Approximate(proto::privacy_usage::DistanceApproximate {
epsilon: 0.0,
delta: 0.0,
}))
};
let get_category_indexes = |
category: IndexKey, partition_id: u32,
| -> Result<Vec<u32>> {
let dependents = match dependent_edges.get(&partition_id) {
Some(dependents) => dependents,
None => return Ok(vec![])
};
Ok(dependents.iter()
.map(|index_id| {
let node = graph.get(index_id).unwrap().clone();
if let Some(proto::component::Variant::Index(_)) = node.variant {
let node_id = node.arguments().remove(&IndexKey::from("names"))
.ok_or_else(|| "names argument must be specified on an index into partitions")?;
Ok((
*index_id,
category == IndexKey::new(release.get(&node_id)
.ok_or_else(|| "names value must be defined")?.value.clone().array()?)?
))
}
else if let Some(proto::component::Variant::Union(_)) = node.variant {
Ok((*index_id, true))
} else {
Err("dependent must be either an index or union".into())
}
})
.collect::<Result<Vec<(u32, bool)>>>()?.iter()
.filter(|pair| pair.1).map(|pair| pair.0)
.collect::<Vec<u32>>())
};
let get_downstream_ids = |
category: Option<IndexKey>, node_id: u32
| -> Result<HashSet<u32>> {
let mut downstream_ids = HashSet::new();
let mut traversal = vec![node_id];
while !traversal.is_empty() {
let node_id = traversal.pop().unwrap();
if let Some(category) = &category {
if let Some(proto::component::Variant::Union(x)) = graph.get(&node_id)
.and_then(|v| v.variant.as_ref()) {
if !x.flatten {
traversal.extend(get_category_indexes(category.clone(), node_id)?);
downstream_ids.insert(node_id);
continue
}
}
}
if let Some(dependents) = dependent_edges.get(&node_id) {
traversal.extend(dependents);
}
downstream_ids.insert(node_id);
}
Ok(downstream_ids)
};
let get_downstream_graph = |
category: Option<IndexKey>, node_id: u32
| -> Result<HashMap<u32, proto::Component>> {
Ok(get_downstream_ids(category, node_id)?.iter()
.map(|node_id| (*node_id, graph.get(node_id).unwrap().clone()))
.collect::<HashMap<u32, proto::Component>>())
};
let max_usage = |l: Result<proto::PrivacyUsage>, r: Result<proto::PrivacyUsage>| -> Result<proto::PrivacyUsage> {
let proto::privacy_usage::DistanceApproximate {
epsilon: eps_l, delta: del_l
} = match l?.distance {
Some(proto::privacy_usage::Distance::Approximate(x)) => Ok(x),
_ => Err("expected approximate privacy")
}?;
let proto::privacy_usage::DistanceApproximate {
epsilon: eps_r, delta: del_r
} = match r?.distance {
Some(proto::privacy_usage::Distance::Approximate(x)) => Ok(x),
_ => Err("expected approximate privacy")
}?;
Ok(proto::PrivacyUsage {
distance: Some(proto::privacy_usage::Distance::Approximate(proto::privacy_usage::DistanceApproximate {
epsilon: eps_l.max(eps_r),
delta: del_l.max(del_r),
}))
})
};
let compute_all_partitions_usage = |
partition_ids: Vec<u32>
| -> Result<proto::PrivacyUsage> {
partition_ids.iter()
.map(|partition_id| compute_graph_privacy_usage(
&get_downstream_graph(None, *partition_id)?,
privacy_definition, properties, release))
.fold1(max_usage)
.unwrap_or_else(|| Ok(zero_usage()))
};
let partitions_usage: proto::PrivacyUsage = partition_ids.into_iter()
.map(|partition_node_id| {
let partition_properties = properties.get(&partition_node_id)
.ok_or_else(|| "partition properties must be defined")?;
partition_properties.partitions()?.children.keys()
.map(|category| {
let unioned_downstream_graph = get_category_indexes(category.clone(), partition_node_id)?.iter()
.map(|index_id| get_downstream_graph(Some(category.clone()), *index_id))
.collect::<Result<Vec<_>>>()?.into_iter().flatten()
.collect::<HashMap<u32, proto::Component>>();
let (batches, partition_ids) = batch_partition(
&unioned_downstream_graph, &release_privacy_usages)?;
let batch_usages = batches.into_iter()
.map(|(_, batch)| compute_batch_privacy_usage(batch))
.fold1(|l, r| l? + r?)
.unwrap_or_else(|| Ok(zero_usage()))?;
batch_usages + compute_all_partitions_usage(partition_ids)?
})
.fold1(max_usage)
.unwrap_or_else(|| Ok(zero_usage()))
})
.fold1(|l, r| l? + r?)
.unwrap_or_else(|| Ok(zero_usage()))?;
let batch_usages = batches.into_iter()
.map(|(_, batch)| compute_batch_privacy_usage(batch))
.fold1(|l, r| l? + r?)
.unwrap_or_else(|| Ok(zero_usage()))?;
batch_usages + partitions_usage
}
pub fn privacy_usage_check(
privacy_usage: &proto::PrivacyUsage,
num_records: Option<i64>,
strict_parameter_check: bool,
) -> Result<Vec<Error>> {
let mut warnings = Vec::new();
match privacy_usage.distance
.as_ref().ok_or_else(|| "usage distance must be defined")? {
proto::privacy_usage::Distance::Approximate(usage) => {
if usage.epsilon <= 0.0 {
return Err("epsilon: privacy parameter epsilon must be greater than 0".into());
}
if strict_parameter_check && usage.epsilon > 1.0 {
warnings.push(format!("Warning: A large privacy parameter of epsilon = {} is in use", usage.epsilon.to_string()).into())
}
match usage.delta.partial_cmp(&0.0)
.ok_or_else(|| Error::from("delta: must not be null"))? {
Ordering::Less => return Err("delta: privacy parameter may not be less than 0".into()),
Ordering::Equal => (),
Ordering::Greater => {
if usage.delta >= 1.0 {
return Err("delta: must be smaller than one".into());
}
match num_records {
Some(num_records) => {
if usage.delta * num_records as f64 > 1.0 {
return Err("delta: a value greater than 1 / num_records is not differentially private".into());
}
if strict_parameter_check && usage.delta * num_records.pow(2) as f64 > 1.0 {
warnings.push("delta: a value greater than 1 / num_records^2 exposes individuals to significant risk".into());
}
}
None => if strict_parameter_check {
return Err("delta: the number of records must be known to check if delta is a value that satisfies differential privacy".into());
}
}
}
}
}
};
Ok(warnings)
}
pub fn get_epsilon(usage: &proto::PrivacyUsage) -> Result<f64> {
match usage.distance.clone()
.ok_or_else(|| Error::from("distance must be defined on a PrivacyUsage"))? {
proto::privacy_usage::Distance::Approximate(distance) => Ok(distance.epsilon),
}
}
pub fn get_delta(usage: &proto::PrivacyUsage) -> Result<f64> {
match usage.distance.clone()
.ok_or_else(|| Error::from("distance must be defined on a PrivacyUsage"))? {
proto::privacy_usage::Distance::Approximate(distance) => Ok(distance.delta),
}
}
pub fn spread_privacy_usage(usages: &[proto::PrivacyUsage], length: usize) -> Result<Vec<proto::PrivacyUsage>> {
if usages.len() == length {
return Ok(usages.to_owned());
}
if usages.len() != 1 {
if length != 1 {
bail!("{} privacy parameters passed when either 1 or {} was required", usages.len(), length);
} else {
bail!("{} privacy parameters passed when one was required", usages.len());
}
}
Ok(match usages[0].distance.clone().ok_or("distance must be defined on a privacy usage")? {
proto::privacy_usage::Distance::Approximate(approx) => (0..length)
.map(|_| proto::PrivacyUsage {
distance: Some(proto::privacy_usage::Distance::Approximate(proto::privacy_usage::DistanceApproximate {
epsilon: approx.epsilon / (length as f64),
delta: approx.delta / (length as f64),
}))
}).collect()
})
}
pub fn get_group_id_path(arguments: Vec<Vec<GroupId>>) -> Result<Vec<GroupId>> {
let partition_depth = get_common_value(&arguments.iter()
.map(|group_ids| group_ids.len())
.collect())
.ok_or_else(|| "all arguments must be parts of the same partition")?;
if partition_depth == 0 {
return Err("arguments must come from a partition".into());
}
(0..partition_depth - 1)
.map(|depth| get_common_value(&arguments.iter()
.map(|group_ids| group_ids[depth].clone())
.collect()
))
.collect::<Option<Vec<GroupId>>>()
.ok_or_else(|| "partition paths of all arguments must match".into())
}
pub fn get_c_stability_multiplier(arguments: Vec<Vec<GroupId>>) -> Result<u32> {
let partition_depth = get_common_value(&arguments.iter()
.map(|group_ids| group_ids.len())
.collect())
.ok_or_else(|| "all arguments must be parts of the same partition")?;
if arguments.is_empty() {
return Err("c-stability cannot be determined on an empty argument set".into());
}
if partition_depth == 0 {
return Ok(1);
}
if partition_depth > 1 && !(0..partition_depth - 1).all(|depth|
get_common_value(&arguments.iter()
.map(|group_ids| group_ids[depth].clone())
.collect()
).is_some()) {
return Err("all arguments must be parts of the same partition".into());
}
let group_ids = arguments.into_iter()
.map(|group_id| group_id.last().unwrap().clone())
.collect::<Vec<GroupId>>();
get_common_value(&group_ids.iter()
.map(|group_id| group_id.partition_id).collect())
.ok_or_else(|| "all arguments must be parts of the same partition")?;
let mut counts = HashMap::new();
group_ids.into_iter().for_each(|group_id|
*counts.entry(group_id.index).or_insert(0) += 1);
Ok(*counts.values().max().unwrap())
}