cdpg_anonkit¶
A toolkit for data anonymisation.
Submodules¶
Classes¶
Handles incremental aggregation of large datasets processed in chunks. |
|
Differential Privacy mechanism with support for various noise addition strategies. |
Package Contents¶
- class cdpg_anonkit.SanitiseData¶
- clip(min_value: float, max_value: float) pandas.Series ¶
Clip (limit) the values in a Series to a specified range.
- Parameters:
series (pd.Series) – The input Series to be clipped.
min_value (float) – The minimum value to clip to.
max_value (float) – The maximum value to clip to.
- Returns:
The clipped Series.
- Return type:
pd.Series
- hash_values(salt: str = '') pandas.Series ¶
Hash the values in a Series using the SHA-256 algorithm.
This can be used to pseudonymise values that need to be kept secret. The salt parameter can be used to add a common salt to all values. This can be useful if you want to combine the hashed values with other columns to create a unique identifier.
- Parameters:
series (pd.Series) – The input Series to be hashed.
salt (str, optional) – The salt to add to all values before hashing. Defaults to an empty string.
- Returns:
The hashed Series.
- Return type:
pd.Series
- suppress(threshold: int = 5, replacement: str | int | float | None = None) pandas.Series ¶
Suppress all values in a Series that occur less than a given threshold.
Replace all values that occur less than the threshold with the replacement value.
- Parameters:
series (pd.Series) – The input Series to be suppressed.
threshold (int, optional) – The minimum number of occurrences for a value to be kept. Defaults to 5.
replacement (Optional[Union[str, int, float]], optional) – The value to replace suppressed values with. Defaults to None, which means that the values will be replaced with NaN.
- Returns:
The Series with suppressed values.
- Return type:
pd.Series
- sanitise_data(columns_to_sanitise: List[str], sanitisation_rules: Dict[str, Dict[str, str | float | int | List | Dict]], drop_na: bool = False) pandas.DataFrame ¶
Sanitise a DataFrame by applying different methods to each column.
- Parameters:
df (pd.DataFrame) – The input DataFrame to be sanitised.
columns_to_sanitise (List[str]) – The columns in the DataFrame to be sanitised.
sanitisation_rules (Dict[str, Dict[str, Union[str, float, int, List, Dict]]]) –
A dictionary that maps each column in columns_to_sanitise to a dictionary that specifies the sanitisation method and parameters for that column. The dictionary should contain the following keys: * ‘method’: str, the sanitisation method to use * ‘params’: Dict[str, Union[str, float, int, List, Dict]], the parameters
for the sanitisation method
drop_na (bool, optional) – If True, drop all rows in the DataFrame that have any NaN values in the columns specified in columns_to_sanitise. Defaults to False.
- Returns:
The sanitised DataFrame.
- Return type:
pd.DataFrame
- class cdpg_anonkit.GeneraliseData¶
- class SpatialGeneraliser¶
- static format_coordinates(series: pandas.Series) Tuple[pandas.Series, pandas.Series] ¶
Clean coordinates attribute formatting.
Takes a pandas Series of coordinates and returns a tuple of two Series: the first with the latitude, and the second with the longitude.
The coordinates are expected to be in the format “[lat, lon]”. The function will strip any leading or trailing whitespace and brackets from the coordinates, split them into two parts, and convert each part to a float.
If the coordinate string is not in the expected format, a ValueError is raised.
- Parameters:
series (pd.Series) – The series of coordinates to be cleaned.
- Returns:
A tuple of two Series, one with the latitude and one with the longitude.
- Return type:
Tuple[pd.Series, pd.Series]
- static generalise_spatial(latitude: pandas.Series, longitude: pandas.Series, spatial_resolution: int) pandas.Series ¶
Generalise a set of coordinates to an H3 index at a given resolution.
- Parameters:
latitude (pd.Series) – The series of latitude values to be generalised.
longitude (pd.Series) – The series of longitude values to be generalised.
spatial_resolution (int) – The spatial resolution of the H3 index. Must be between 0 and 15.
- Returns:
A series of H3 indices at the specified resolution.
- Return type:
pd.Series
- Raises:
ValueError – If the spatial resolution is not between 0 and 15, or if the latitude or longitude values are not between -90 and 90 or -180 and 180 respectively.
Warning – If the length of the latitude and longitude series are not equal.
- class TemporalGeneraliser¶
- static format_timestamp(series: pandas.Series) pandas.Series ¶
Convert a pandas Series of timestamps into datetime objects.
This function takes a Series containing timestamp data and converts it into pandas datetime objects. It handles mixed format timestamps and coerces any non-parseable values into NaT (Not a Time).
- Parameters:
series (pd.Series) – The input Series containing timestamp data to be converted.
- Returns:
A Series where all timestamp values have been converted to datetime objects, with non-parseable values set to NaT.
- Return type:
pd.Series
- static generalise_temporal(data: pandas.Series | pandas.DataFrame, timestamp_col: str = None, temporal_resolution: int = 60) pandas.Series ¶
Generalise timestamp data into specified temporal resolutions.
This function processes timestamp data, either in the form of a Series or a DataFrame, and generalises it into timeslots based on the specified temporal resolution. The resolution must be one of the following values: 15, 30, or 60 minutes.
- Parameters:
data (Union[pd.Series, pd.DataFrame]) – The input timestamp data. Can be a pandas Series of datetime objects or a DataFrame containing a column with datetime data.
timestamp_col (str, optional) – The name of the column containing timestamp data in the DataFrame. Must be specified if the input data is a DataFrame. Defaults to None.
temporal_resolution (int, optional) – The temporal resolution in minutes for which the timestamps should be generalised. Allowed values are 15, 30, or 60. Defaults to 60.
- Returns:
A pandas Series representing the generalised timeslots, with each entry formatted as ‘hour_minute’, indicating the start of the timeslot.
- Return type:
pd.Series
- Raises:
AssertionError – If the temporal resolution is not one of the allowed values (15, 30, 60).
ValueError – If timestamp_col is not specified when input data is a DataFrame, or if the specified column is not found in the DataFrame. If the timestamps cannot be converted to datetime objects.
TypeError – If the input data is neither a pandas Series nor a DataFrame.
Example
### Using with a Series generalise_temporal(ts_series)
### Using with a DataFrame generalise_temporal(df, timestamp_col=’timestamp’)
- class CategoricalGeneraliser¶
- static generalise_categorical(data: pandas.Series, bins: int | List[float], labels: List[str] | None = None) pandas.Series ¶
Generalise a categorical column by binning the values into categories.
- Parameters:
data (pd.Series) – The input Series to be generalised.
bins (Union[int, List[float]]) – The number of bins to use, or a list of bin edges.
labels (Optional[List[str]], optional) – The labels to use for each bin. If not specified, the bin edges will be used as labels.
- Returns:
The generalised Series.
- Return type:
pd.Series
- class cdpg_anonkit.IncrementalGroupbyAggregator(group_columns: str | List[str], agg_column: str, agg_func: Literal['sum', 'count', 'min', 'max', 'mean'])¶
Handles incremental aggregation of large datasets processed in chunks.
Carefully merges chunk-level statistics to ensure correct final aggregation.
- group_columns¶
- agg_column¶
- agg_func¶
- _group_stats: Dict[tuple, Dict[str, Any]]¶
- _merge_chunk_stats(existing: Dict[str, Any], new_chunk: Dict[str, Any]) Dict[str, Any] ¶
Merge chunk-level statistics into existing statistics.
- Parameters:
existing (Dict[str, Any]) – The existing statistics to merge into.
new_chunk (Dict[str, Any]) – The new chunk statistics to merge.
- Returns:
The merged statistics.
- Return type:
Dict[str, Any]
- Raises:
ValueError – If the aggregation function is not one of {‘mean’, ‘sum’, ‘min’, ‘max’, ‘count’}.
- process_chunk(chunk: pandas.DataFrame)¶
Process a chunk of data by performing aggregation and updating internal statistics.
This method processes a given data chunk by validating its columns, performing groupby aggregation based on the specified aggregation function, and merging the computed statistics into the internal storage for incremental aggregation.
- Parameters:
chunk (pd.DataFrame) – A DataFrame representing a chunk of data to be processed. It must contain the columns specified in self.group_columns and self.agg_column.
- Raises:
ValueError – If any of the required columns specified in self.group_columns or self.agg_column are not found in the chunk, or if the aggregation function is unsupported.
- get_final_result() pandas.DataFrame ¶
Return the final result as a DataFrame after all chunks have been processed.
After all chunks have been processed using process_chunk, this method returns a DataFrame containing the final result of the aggregation. The columns of the DataFrame include the group columns and the aggregated column with a name based on the specified aggregation function (e.g. ‘mean’, ‘sum’, ‘min’, ‘max’, or ‘count’).
- Returns:
The final result of the aggregation.
- Return type:
pd.DataFrame
- class cdpg_anonkit.DifferentialPrivacy(mechanism: Literal['laplace', 'gaussian', 'exponential'], epsilon: float = 1.0, delta: float | None = None, sensitivity: float | None = None)¶
Differential Privacy mechanism with support for various noise addition strategies.
Focuses on fundamental DP principles with extensibility in mind.
- mechanism¶
- epsilon = 1.0¶
- delta = None¶
- _sensitivity = None¶
- static clip_count(count: int, lower_bound: int = 0, upper_bound: int | None = None) int ¶
Clip the count to a specified range.
This static method ensures that the count provided falls within the specified lower and upper bounds. If the upper bound is not provided, the count is clipped to the lower bound only.
- Parameters:
count (int) – The original count to be clipped.
lower_bound (int, optional) – The minimum value to clip to, by default 0.
upper_bound (Optional[int], optional) – The maximum value to clip to, by default None.
- Returns:
The clipped count, constrained by the specified bounds.
- Return type:
int
- compute_sensitivity(query_type: str = 'count', lower_bound: int = 0, upper_bound: int | None = None) float ¶
Compute the sensitivity of a query based on its type and bounds.
Sensitivity is a measure of how much the output of a query can change by modifying a single record in the dataset. It is crucial for determining the amount of noise to add in differential privacy mechanisms.
- Parameters:
query_type (str, optional) – The type of query for which sensitivity is being computed. Currently supported: ‘count’. Defaults to ‘count’.
lower_bound (int, optional) – The minimum value constraint for the query. Defaults to 0.
upper_bound (Optional[int], optional) – The maximum value constraint for the query. If None, no upper bound is considered. Defaults to None.
- Returns:
The sensitivity of the query. For ‘count’ queries, this is 1.0.
- Return type:
float
- Raises:
ValueError – If the sensitivity computation for the specified query_type is not implemented.
- add_noise(value: int | float, sensitivity: float | None = None, epsilon: float | None = None) int | float ¶
Add noise to a given value according to the specified differential privacy mechanism.
Depending on the mechanism set during initialization, this method will add noise to the input value to ensure differential privacy. Currently, the Laplace mechanism is implemented, with plans to support Gaussian and Exponential mechanisms.
- Parameters:
value (Union[int, float]) – The original value to which noise will be added.
sensitivity (Optional[float], optional) – The sensitivity of the query. If not provided, the class-level sensitivity will be used. Defaults to None.
epsilon (Optional[float], optional) – The privacy budget. If not provided, the class-level epsilon will be used. Defaults to None.
- Returns:
The value with added noise according to the specified mechanism.
- Return type:
Union[int, float]
- Raises:
ValueError – If the Gaussian mechanism is selected but delta is not specified. If the mechanism is unsupported.
NotImplementedError – If the Gaussian or Exponential mechanism is selected, as they are not yet implemented.