cdpg_anonkit.aggregation

Classes

IncrementalGroupbyAggregator

Handles incremental aggregation of large datasets processed in chunks.

Functions

Module Contents

class cdpg_anonkit.aggregation.IncrementalGroupbyAggregator(group_columns: str | List[str], agg_column: str, agg_func: Literal['sum', 'count', 'min', 'max', 'mean'])

Handles incremental aggregation of large datasets processed in chunks.

Carefully merges chunk-level statistics to ensure correct final aggregation.

group_columns
agg_column
agg_func
_group_stats: Dict[tuple, Dict[str, Any]]
_merge_chunk_stats(existing: Dict[str, Any], new_chunk: Dict[str, Any]) Dict[str, Any]

Merge chunk-level statistics into existing statistics.

Parameters:
  • existing (Dict[str, Any]) – The existing statistics to merge into.

  • new_chunk (Dict[str, Any]) – The new chunk statistics to merge.

Returns:

The merged statistics.

Return type:

Dict[str, Any]

Raises:

ValueError – If the aggregation function is not one of {‘mean’, ‘sum’, ‘min’, ‘max’, ‘count’}.

process_chunk(chunk: pandas.DataFrame)

Process a chunk of data by performing aggregation and updating internal statistics.

This method processes a given data chunk by validating its columns, performing groupby aggregation based on the specified aggregation function, and merging the computed statistics into the internal storage for incremental aggregation.

Parameters:

chunk (pd.DataFrame) – A DataFrame representing a chunk of data to be processed. It must contain the columns specified in self.group_columns and self.agg_column.

Raises:

ValueError – If any of the required columns specified in self.group_columns or self.agg_column are not found in the chunk, or if the aggregation function is unsupported.

get_final_result() pandas.DataFrame

Return the final result as a DataFrame after all chunks have been processed.

After all chunks have been processed using process_chunk, this method returns a DataFrame containing the final result of the aggregation. The columns of the DataFrame include the group columns and the aggregated column with a name based on the specified aggregation function (e.g. ‘mean’, ‘sum’, ‘min’, ‘max’, or ‘count’).

Returns:

The final result of the aggregation.

Return type:

pd.DataFrame

cdpg_anonkit.aggregation.example_usage()