Spaces:
Runtime error
Runtime error
from dataclasses import dataclass | |
from huggingface_hub import HfApi, hf_hub_download, upload_file | |
import pyarrow.parquet as pq | |
from datetime import datetime, timezone | |
import tempfile | |
import hashlib | |
import toml | |
import os | |
_ROOT = os.path.dirname(os.path.abspath(__file__)) | |
ROOT = os.path.dirname(_ROOT) | |
GR_CONF = { | |
"title": "OpenVideo", | |
"css": f"{ROOT}/assets/design.css", | |
"js": f"{ROOT}/assets/design.js", | |
"theme": f"{ROOT}/assets/theme.json", | |
} | |
class HFUser: | |
token: str | |
name: str | |
avatar: str | |
hf_api: HfApi | |
repo: str = None | |
def from_token(cls, _token: str) -> "HFUser": | |
_api = HfApi(token=_token, endpoint="https://huggingface.co.") | |
raw = _api.whoami() | |
_name = raw["name"] | |
_avatar = f"https://huggingface.co.{raw["avatarUrl"]}" | |
return cls(token=_token, name=_name, avatar=_avatar, hf_api=_api) | |
def empty(cls) -> "HFUser": | |
return cls(token=None, name=None, avatar=None, hf_api=None) | |
def auth(self, _token: str) -> "HFUser": | |
_api = HfApi(token=_token, endpoint="https://huggingface.co.") | |
raw = _api.whoami() | |
self.token = _token | |
self.hf_api = _api | |
self.name = raw["name"] | |
self._avatar = f"https://huggingface.co.{raw["avatarUrl"]}" | |
return self.name | |
def ping(self, name) -> str: | |
if(name == self.name): | |
_utc = datetime.now(timezone.utc) | |
return _utc.strftime('%Y-%m-%d') | |
return "Unauthorized OP" | |
def list_dataset(self, repo, path="data"): | |
self.repo = repo | |
_raw = self.hf_api.list_repo_files(repo, repo_type="dataset") | |
if repo == "OpenVideo/Sample-2k": | |
files = filter(lambda f: f.endswith(".mp4"), _raw) | |
else: | |
files = filter(lambda f: f.startswith(path + os.sep), _raw) | |
return list(files) | |
def fetch_file(self, fname, local_dir="/dev/shm"): | |
os.environ["TOKIO_WORKER_THREADS"] = "8" | |
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
file = hf_hub_download(repo_id=self.repo, filename=fname, repo_type="dataset", local_dir=local_dir, token=self.token) | |
return file | |
def split_parquet(self, pq_path, batch_size=1): | |
tasks = [] | |
print(pq_path, batch_size) | |
pf = pq.ParquetFile(pq_path) | |
for batch in pf.iter_batches(batch_size): | |
_chunk = [] | |
df = batch.to_pandas() | |
for binary in df["video"]: | |
if(binary): | |
_v = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
with open(_v.name, "wb") as f: | |
_ = f.write(binary) | |
_chunk.append(_v.name) | |
tasks.append(_chunk) | |
del df | |
return tasks | |
def fetch_file_slice(self, fname, batch_size): | |
pq_path = self.fetch_file(self.repo, fname, local_dir="/tmp") | |
file_slice = self.split_parquet(pq_path, batch_size) | |
return file_slice | |
def dump_video(self, binary): | |
vt = tempfile.NamedTemporaryFile(suffix=".mp4") | |
with open(vt.name, "wb") as f: | |
_ = f.write(binary) | |
return vt.name | |
def bytes_md5(self, binary): | |
md5_hash = hashlib.md5(binary).hexdigest() | |
return md5_hash | |
def tag(self, binary, tags): | |
md5_hash = self.bytes_md5(binary) | |
_info = { | |
"md5": md5_hash, | |
"tags": tags, | |
"operator_name": self.name, | |
} | |
return _info | |
def push_tag(self, info, target_repo): | |
md5 = info["md5"] | |
_raw = f"raw/{md5}/{md5}.mp4" | |
idx_path = f"index/{md5}/{md5}.toml" | |
with open(idx_path, "w") as f: | |
toml.dump(info, f) | |
_repo = upload_file( | |
path_or_fileobj=idx_path, | |
path_in_repo=idx_path, | |
repo_id = target_repo, | |
token=self.token, | |
repo_type="dataset" | |
) | |
return _repo | |
if __name__ == "__main__": | |
u = HFUser.from_token("hf_xxxxx") | |
_patch = u.split_parquet("/opt/000000.parquet", batch_size=4) |