Source code for datapyground.compute.datasources

"""Query Plan nodes that load data

The datasource nodes are expected to fetch the data from some source,
convert it into the format accepted by the compute engine and forward it
to the next node in the plan.

They are used to do things like loading
data from CSV files or equivalent operations
"""

from abc import abstractmethod

import pyarrow as pa
import pyarrow.csv
import pyarrow.parquet

from .base import QueryPlanNode


[docs] class DataSourceNode(QueryPlanNode): """Base class for nodes that load data from a source."""
[docs] @abstractmethod def poll_schema(self) -> pa.Schema: """Poll the schema of the data source without loading its content.""" ...
[docs] class CSVDataSource(DataSourceNode): """Load data from a CSV file. Given a local CSV file path, load the content, convert it into Arrow format, and emit it for the next nodes of the query plan to consume. """ def __init__(self, filename: str, block_size: int | None = None) -> None: """ :param filename: The path of the local CSV file. :param block_size: How big to make batches of data, Influences how many batches will be produced """ self.filename = filename self.block_size = block_size def __str__(self) -> str: return f"CSVDataSource({self.filename}, block_size={self.block_size})"
[docs] def batches(self) -> QueryPlanNode.RecordBatchesGenerator: """Open CSV file and emit the batches.""" with pa.csv.open_csv( self.filename, read_options=pa.csv.ReadOptions(block_size=self.block_size) ) as reader: for batch in reader: yield batch
[docs] def poll_schema(self) -> pa.Schema: """Poll the schema of the CSV file.""" with pa.csv.open_csv(self.filename) as reader: return reader.schema
[docs] class ParquetDataSource(DataSourceNode): """Load data from a Parquet file. Given a local parquet file path, load the content, convert it into Arrow format, and emit it for the next nodes of the query plan to consume. """ def __init__(self, filename: str, batch_size: int | None = None) -> None: """ :param filename: The path of the local parquet file. :param batch_size: How big to make batches of data, Influences how many batches will be produced """ self.filename = filename self.batch_size = batch_size or 65536 def __str__(self) -> str: return f"ParquetDataSource({self.filename}, batch_size={self.batch_size})"
[docs] def batches(self) -> QueryPlanNode.RecordBatchesGenerator: """Open CSV file and emit the batches.""" with pa.parquet.ParquetFile(self.filename) as reader: yield from reader.iter_batches(batch_size=self.batch_size)
[docs] def poll_schema(self) -> pa.Schema: """Poll the schema of the Parquet file.""" with pa.parquet.ParquetFile(self.filename) as reader: return reader.schema_arrow
[docs] class PyArrowTableDataSource(DataSourceNode): """Load data from an in-memory pyarrow.Table or pyarrow.RecordBatch. Given a :class:`pyarrow.Table` or `pyarrow.RecordBatch` object, allow to use its data in a query plan. """ def __init__(self, table: pa.Table | pa.RecordBatch) -> None: """ :param table: The table or recordbatch with the data to read. """ self.table = table self.is_recordbatch = isinstance(table, pa.RecordBatch) def __str__(self) -> str: return f"PyArrowTableDataSource(columns={self.table.column_names}, rows={self.table.num_rows})"
[docs] def batches(self) -> QueryPlanNode.RecordBatchesGenerator: """Emit the data contained in the Table for consumption by other node.""" if self.is_recordbatch: yield self.table else: yield from self.table.to_batches()
[docs] def poll_schema(self) -> pa.Schema: """Poll the schema of the Table.""" return self.table.schema