datapyground.compute.sorting¶
Query plan nodes that perform sorting of data.
When computing ranks or looking for most significant values, it’s often necessary to sort the data based on one or more columns.
Sorted data can also benefit aggregations which will be able to run faster as all the values that constitute a group are in succession.
This module implements the sorting capabilities.
Classes
- class datapyground.compute.sorting.ExternalSortKey(row: RecordBatch, keys_indices: list[int], descending_orders: list[bool])[source]¶
Makes pyarrow data sortable by Python functions.
This implements the rich comparison methods to allow sorting of pyarrow data based on the values of the columns in the order they are provided.
- Parameters:
row – The recordbatch row to compare.
key_indices – The indices of the column to use for comparison
descending_orders – Which of the values are compared for descending order
- class datapyground.compute.sorting.ExternalSortNode(keys: list[str], descending: list[bool], child: QueryPlanNode, batch_size: int = 1024)[source]¶
Sort data based on one or more columns offloading to disk.
This node behaves similarly to
SortNodebut performs the sorting operation offloading the data to disk. It expects each chunk to fit into memory individually, which is a reasonable expectation given that the child node had generated them without running out of memory, but it can work with datasets that are larger than the available memory.The performance is currently suboptimal because the merge of the sorted batches has to happen in Python, and thus all values have to be converted to python objects, but the memory consumption is greatly reduced compared to SortNode
Those were the performance on the development machine:
InMemory -> TIME: 2.0s MEMORY: 772MB
External -> TIME: 69.6s MEMORY: 92MB
Which confirms the expected memory improvements provided by the external sorting implementation.
>>> import pyarrow as pa >>> from datapyground.compute import col, lit, FunctionCallExpression, PyArrowTableDataSource >>> data = pa.record_batch({"values": [1, 2, 3, 4, 5]}) >>> # Sort the data in descending order >>> sort = ExternalSortNode(["values"], [True], PyArrowTableDataSource(data)) >>> next(sort.batches()) pyarrow.RecordBatch values: int64 ---- values: [5,4,3,2,1]
- Parameters:
keys – The columns to sort by in the order they should be sorted.
descending – If each columns should be sorted in a descending order.
child – The node emitting the data to be filtered.
- batches() Generator[RecordBatch, None, None][source]¶
The sorting to the child node.
Each batch yielded by the child node will be sorted individually based on the provided keys then the result will be merged.
This requires loading in memory all the data. Currently pyarrow doesn’t seem to offer any way to implement an external sorting algorithm.
The memory usage is kept at the minimum by using temporary files to store the batches and memory mapping them to avoid loading all the data at once, but the data will be fully loaded in when the batches are merged.
Also this node can leak temporary files if the generator is not closed.
- class datapyground.compute.sorting.SortNode(keys: list[str], descending: list[bool], child: QueryPlanNode)[source]¶
Sort data in-memory based on one or more columns.
The node expects a list of columns and a list of sort directions. The data will be sorted based on the columns in the order they are provided.
The sort directions are used to specify if the sorting should be ascending or descending.
>>> import pyarrow as pa >>> from datapyground.compute import col, lit, FunctionCallExpression, PyArrowTableDataSource >>> data = pa.record_batch({"values": [1, 2, 3, 4, 5]}) >>> # Sort the data in descending order >>> sort = SortNode(["values"], [True], PyArrowTableDataSource(data)) >>> next(sort.batches()) pyarrow.RecordBatch values: int64 ---- values: [5,4,3,2,1]
- Parameters:
keys – The columns to sort by in the order they should be sorted.
descending – If each columns should be sorted in a descending order.
child – The node emitting the data to be filtered.
- batches() Generator[RecordBatch, None, None][source]¶
The sorting to the child node.
Batches provided by child node are accumulated until they are all loaded in memory, than they are merged and sorted as an unique table.
This is usually faster but requires more memory and might oom for large datasets.