Spaces:
Running
Running
import math | |
import pickle | |
import tempfile | |
from functools import partial | |
from typing import Iterator, Optional, Union | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
from huggingface_hub import CommitOperationAdd, HfFileSystem | |
from pyspark.sql.dataframe import DataFrame | |
from pyspark.sql.pandas.types import from_arrow_schema, to_arrow_schema | |
spark = None | |
def set_session(session): | |
global spark | |
spark = session | |
def _read(iterator: Iterator[pa.RecordBatch], columns: Optional[list[str]], filters: Optional[Union[list[tuple], list[list[tuple]]]], **kwargs) -> Iterator[pa.RecordBatch]: | |
for batch in iterator: | |
paths = batch[0].to_pylist() | |
ds = pq.ParquetDataset(paths, **kwargs) | |
yield from ds._dataset.to_batches(columns=columns, filter=pq.filters_to_expression(filters) if filters else None) | |
def read_parquet( | |
path: str, | |
columns: Optional[list[str]] = None, | |
filters: Optional[Union[list[tuple], list[list[tuple]]]] = None, | |
**kwargs, | |
) -> DataFrame: | |
""" | |
Loads Parquet files from Hugging Face using PyArrow, returning a PySPark `DataFrame`. | |
It reads Parquet files in a distributed manner. | |
Access private or gated repositories using `huggingface-cli login` or passing a token | |
using the `storage_options` argument: `storage_options={"token": "hf_xxx"}` | |
Parameters | |
---------- | |
path : str | |
Path to the file. Prefix with a protocol like `hf://` to read from Hugging Face. | |
You can read from multiple files if you pass a globstring. | |
columns : list, default None | |
If not None, only these columns will be read from the file. | |
filters : List[Tuple] or List[List[Tuple]], default None | |
To filter out data. | |
Filter syntax: [[(column, op, val), ...],...] | |
where op is [==, =, >, >=, <, <=, !=, in, not in] | |
The innermost tuples are transposed into a set of filters applied | |
through an `AND` operation. | |
The outer list combines these sets of filters through an `OR` | |
operation. | |
A single list of tuples can also be used, meaning that no `OR` | |
operation between set of filters is to be conducted. | |
**kwargs | |
Any additional kwargs are passed to pyarrow.parquet.ParquetDataset. | |
Returns | |
------- | |
DataFrame | |
DataFrame based on parquet file. | |
Examples | |
-------- | |
>>> path = "hf://datasets/username/dataset/data.parquet" | |
>>> pd.DataFrame({"foo": range(5), "bar": range(5, 10)}).to_parquet(path) | |
>>> read_parquet(path).show() | |
+---+---+ | |
|foo|bar| | |
+---+---+ | |
| 0| 5| | |
| 1| 6| | |
| 2| 7| | |
| 3| 8| | |
| 4| 9| | |
+---+---+ | |
>>> read_parquet(path, columns=["bar"]).show() | |
+---+ | |
|bar| | |
+---+ | |
| 5| | |
| 6| | |
| 7| | |
| 8| | |
| 9| | |
+---+ | |
>>> sel = [("foo", ">", 2)] | |
>>> read_parquet(path, filters=sel).show() | |
+---+---+ | |
|foo|bar| | |
+---+---+ | |
| 3| 8| | |
| 4| 9| | |
+---+---+ | |
""" | |
filesystem: HfFileSystem = kwargs.pop("filesystem") if "filesystem" in kwargs else HfFileSystem(**kwargs.pop("storage_options", {})) | |
paths = filesystem.glob(path) | |
if not paths: | |
raise FileNotFoundError(f"Counldn't find any file at {path}") | |
rdd = spark.sparkContext.parallelize([{"path": path} for path in paths], len(paths)) | |
df = spark.createDataFrame(rdd) | |
arrow_schema = pq.read_schema(filesystem.open(paths[0])) | |
schema = pa.schema([field for field in arrow_schema if (columns is None or field.name in columns)], metadata=arrow_schema.metadata) | |
return df.mapInArrow( | |
partial(_read, columns=columns, filters=filters, filesystem=filesystem, schema=arrow_schema, **kwargs), | |
from_arrow_schema(schema), | |
) | |
def _preupload(iterator: Iterator[pa.RecordBatch], path: str, schema: pa.Schema, filesystem: HfFileSystem, row_group_size: Optional[int] = None, **kwargs) -> Iterator[pa.RecordBatch]: | |
resolved_path = filesystem.resolve_path(path) | |
with tempfile.NamedTemporaryFile(suffix=".parquet") as temp_file: | |
with pq.ParquetWriter(temp_file.name, schema=schema, **kwargs) as writer: | |
for batch in iterator: | |
writer.write_batch(batch, row_group_size=row_group_size) | |
addition = CommitOperationAdd(path_in_repo=temp_file.name, path_or_fileobj=temp_file.name) | |
filesystem._api.preupload_lfs_files(repo_id=resolved_path.repo_id, additions=[addition], repo_type=resolved_path.repo_type, revision=resolved_path.revision) | |
yield pa.record_batch({"addition": [pickle.dumps(addition)]}, schema=pa.schema({"addition": pa.binary()})) | |
def _commit(iterator: Iterator[pa.RecordBatch], path: str, filesystem: HfFileSystem, max_operations_per_commit=50) -> Iterator[pa.RecordBatch]: | |
resolved_path = filesystem.resolve_path(path) | |
additions: list[CommitOperationAdd] = [pickle.loads(addition) for addition in pa.Table.from_batches(iterator, schema=pa.schema({"addition": pa.binary()}))[0].to_pylist()] | |
num_commits = math.ceil(len(additions) / max_operations_per_commit) | |
for shard_idx, addition in enumerate(additions): | |
addition.path_in_repo = resolved_path.path_in_repo.replace("{shard_idx:05d}", f"{shard_idx:05d}") | |
for i in range(0, num_commits): | |
operations = additions[i * max_operations_per_commit : (i + 1) * max_operations_per_commit] | |
commit_message = "Upload using PySpark" + (f" (part {i:05d}-of-{num_commits:05d})" if num_commits > 1 else "") | |
filesystem._api.create_commit(repo_id=resolved_path.repo_id, repo_type=resolved_path.repo_type, revision=resolved_path.revision, operations=operations, commit_message=commit_message) | |
yield pa.record_batch({"path": [addition.path_in_repo for addition in operations]}, schema=pa.schema({"path": pa.string()})) | |
def write_parquet(df: DataFrame, path: str, **kwargs) -> None: | |
""" | |
Write Parquet files to Hugging Face using PyArrow. | |
It uploads Parquet files in a distributed manner in two steps: | |
1. Preupload the Parquet files in parallel in a distributed banner | |
2. Commit the preuploaded files | |
Authenticate using `huggingface-cli login` or passing a token | |
using the `storage_options` argument: `storage_options={"token": "hf_xxx"}` | |
Parameters | |
---------- | |
path : str | |
Path of the file or directory. Prefix with a protocol like `hf://` to read from Hugging Face. | |
It writes Parquet files in the form "part-xxxxx.parquet", or to a single file if `path ends with ".parquet". | |
**kwargs | |
Any additional kwargs are passed to pyarrow.parquet.ParquetWriter. | |
Returns | |
------- | |
DataFrame | |
DataFrame based on parquet file. | |
Examples | |
-------- | |
>>> spark.createDataFrame(pd.DataFrame({"foo": range(5), "bar": range(5, 10)})) | |
>>> # Save to one file | |
>>> write_parquet(df, "hf://datasets/username/dataset/data.parquet") | |
>>> # OR save to a directory (possibly in many files) | |
>>> write_parquet(df, "hf://datasets/username/dataset") | |
""" | |
filesystem: HfFileSystem = kwargs.pop("filesystem", HfFileSystem(**kwargs.pop("storage_options", {}))) | |
if path.endswith(".parquet") or path.endswith(".pq"): | |
df = df.coalesce(1) | |
else: | |
path += "/part-{shard_idx:05d}.parquet" | |
df.mapInArrow( | |
partial(_preupload, path=path, schema=to_arrow_schema(df.schema), filesystem=filesystem, **kwargs), | |
from_arrow_schema(pa.schema({"addition": pa.binary()})), | |
).repartition(1).mapInArrow( | |
partial(_commit, path=path, filesystem=filesystem), | |
from_arrow_schema(pa.schema({"path": pa.string()})), | |
).collect() |