1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use smartnoise_validator::errors::*;
use smartnoise_validator::components::Named;

use crate::NodeArguments;
use smartnoise_validator::base::{Value, ReleaseNode, IndexKey};
use indexmap::IndexMap;
use crate::components::Evaluable;

use smartnoise_validator::{proto};

impl Evaluable for proto::Materialize {
    fn evaluate(&self, _privacy_definition: &Option<proto::PrivacyDefinition>, arguments: NodeArguments) -> Result<ReleaseNode> {

        let column_names = self.get_names(
            arguments.iter().map(|(k, v)| (k.clone(), v)).collect(),
            IndexMap::new(), None)?;

        // num columns is sufficient shared information to build the dataframes
        let num_columns = column_names.len();

        let mut response = (0..num_columns)
            .map(|_| Vec::new())
            .collect::<Vec<Vec<String>>>();

        let mut reader = match csv::ReaderBuilder::new()
            .has_headers(self.skip_row)
            .from_path(self.file_path.clone()) {
            Ok(reader) => reader,
            Err(_) => return Err("the provided file path could not be found".into())
        };

        // parse from csv into response
        reader.deserialize().try_for_each(|result: std::result::Result<Vec<String>, _>| {

            // parse each record into the smartnoise internal format
            match result {
                Ok(record) => record.into_iter().enumerate()
                    .filter(|(idx, _)| idx < &num_columns)
                    .for_each(|(idx, value)| response[idx].push(value)),
                Err(e) => return Err(format!("{:?}", e).into())
            };
            Ok::<_, Error>(())
        })?;

        let num_nonempty_columns = response.iter()
            .filter(|col| !col.is_empty()).count();

        if 0 < num_nonempty_columns && num_nonempty_columns < num_columns {
            (num_nonempty_columns..num_columns).for_each(|idx|
                response[idx] = (0..response[0].len()).map(|_| "".to_string()).collect::<Vec<String>>())
        }

        Ok(ReleaseNode::new(Value::Dataframe(column_names.into_iter()
            .zip(response.into_iter())
            .map(|(key, value): (IndexKey, Vec<String>)|
                (key, ndarray::Array::from(value).into_dyn().into()))
            .collect::<IndexMap<IndexKey, Value>>())))
    }
}