datapyground.compute.aggregate¶
Query plan nodes that aggregations.
Frequently when analysing data is necessary to compute statistics like the min, max, average, etc… of the data stored in datasets.
The aggregate node is in charge of computing those aggregations and projecting them as new columns in a query pipeline.
Typically the aggregate node will group the data by a set of columns and then compute the aggregations
For example, given the following data:
city, shop, n_employees
New York, Shop A, 10
New York, Shop B, 15
Los Angeles, Shop C, 8
Los Angeles, Shop D, 12
New York, Shop E, 20
We could group by city and compute the sum of the employees to get:
city, total_employees
New York, 45
Los Angeles, 20
Classes
- class datapyground.compute.aggregate.AggregateNode(keys: list[str], aggregations: dict[str, Aggregation], child: QueryPlanNode)[source]¶
Group data and compute aggregations.
>>> import pyarrow as pa >>> import pyarrow.compute as pc >>> from datapyground.compute import col, lit, SumAggregation, PyArrowTableDataSource >>> data = pa.record_batch({ ... 'city': pa.array(['New York', 'New York', 'Los Angeles', 'Los Angeles', 'New York']), ... 'shop': pa.array(['Shop A', 'Shop B', 'Shop C', 'Shop D', 'Shop E']), ... 'n_employees': pa.array([10, 15, 8, 12, 20]) ... }) >>> aggregate = AggregateNode(["city"], {"total_employees": SumAggregation("n_employees")}, PyArrowTableDataSource(data)) >>> next(aggregate.batches()) pyarrow.RecordBatch city: string total_employees: int64 ---- city: ["New York","Los Angeles"] total_employees: [45,20]
- Parameters:
keys – The columns to group by.
aggregations – The aggregations to compute in the form of {“new_col_name”: Aggregation}.
child – The child node that will provide the data to aggregate.
- batches() Generator[RecordBatch, None, None][source]¶
Apply the filtering to the child node.
For each recordbatch yielded by the child node, apply the expression and get back a mask (an array of only true/false values).
Based on the mask filter the rows of the batch and return only those matching the filter.
- single_key_aggregation() Generator[RecordBatch, None, None][source]¶
Compute the aggregation for a single key.
This is an optimized path where we can rely on dictionary encoding to find the unique values of the key column and then filter the rows.
- multi_key_aggregation() Generator[RecordBatch, None, None][source]¶
Compute the aggregation for multiple keys.
In this case we will have to manually implement the grouping as we can’t rely on dictionary encoding to find the unique values
- reduce_aggregations(chunks_data: dict[Any, dict[str, list[Scalar]]]) RecordBatch[source]¶
Reduce the partial aggregation results to the final aggregation results.
Both single and multi key aggregation will end up computing the aggregations for each chunk separately, this method will reduce the partial aggregation results to the final aggregation results.
For example if we had 3 chunks and the chunks_data is:
{"New York": {"total_employees": [10, 20, 30]}}
The result will be:
{"New York": {"total_employees": 60}}
- class datapyground.compute.aggregate.Aggregation(column: str)[source]¶
Base class for aggregations.
Every aggregation is expected to implement a method to compute any needed intermediate results on a single chunk of data and then provide a reduce method to combine the intermediate results into a final result.
- abstract compute_chunk(batch: RecordBatch) Any[source]¶
Compute the aggregation partial results for a single chunk of data.
This method can return any intermediate result that will be combined by the reduce method to get the final result.
Generally this will be a scalar, but in some cases it might return more complex data structures. See
MeanAggregationfor an example.
- class datapyground.compute.aggregate.CountAggregation(column: str)[source]¶
Compute the count of an aggregated column.
This is based on computing the counts for each intermediate batch and then sum them to compute the final result.
- compute_chunk(batch: RecordBatch) Any[source]¶
Compute the count of the column in a single batch.
- class datapyground.compute.aggregate.MaxAggregation(column: str)[source]¶
Compute the max of an aggregated column.
- class datapyground.compute.aggregate.MeanAggregation(column: str)[source]¶
Compute the mean of an aggregated column.
This is based by computing count and sum of the column for each intermediate batch and then dividing the sum of all intermediate results by the count of all intermediate results.
- compute_chunk(batch: RecordBatch) Any[source]¶
Compute the count and sum of the column in a single batch.
- class datapyground.compute.aggregate.MinAggregation(column: str)[source]¶
Compute the min of an aggregated column.
- class datapyground.compute.aggregate.SimpleAggregation(column: str)[source]¶
Provide a base implementation for simple aggregations like min,max,sum.
Simple aggregations are those where the function applied to compute intermediate results for a single chunk of data is the same as the function applied to combine the intermediate results into the final.
For example
sum([1, 2, 3])is the same assum([sum([1, 2]), 3]).- compute_chunk(batch: RecordBatch) Any[source]¶
Compute the aggregation partial results for a single chunk of data.