from dataclasses import dataclass from huggingface_hub import HfApi, hf_hub_download, upload_file import pyarrow.parquet as pq from datetime import datetime, timezone import tempfile import hashlib import toml import os _ROOT = os.path.dirname(os.path.abspath(__file__)) ROOT = os.path.dirname(_ROOT) GR_CONF = { "title": "OpenVideo", "css": f"{ROOT}/assets/design.css", "js": f"{ROOT}/assets/design.js", "theme": f"{ROOT}/assets/theme.json", } @dataclass class HFUser: token: str name: str avatar: str hf_api: HfApi repo: str = None @classmethod def from_token(cls, _token: str) -> "HFUser": _api = HfApi(token=_token, endpoint="https://huggingface.co.") raw = _api.whoami() _name = raw["name"] _avatar = f"https://huggingface.co.{raw["avatarUrl"]}" return cls(token=_token, name=_name, avatar=_avatar, hf_api=_api) @classmethod def empty(cls) -> "HFUser": return cls(token=None, name=None, avatar=None, hf_api=None) def auth(self, _token: str) -> "HFUser": _api = HfApi(token=_token, endpoint="https://huggingface.co.") raw = _api.whoami() self.token = _token self.hf_api = _api self.name = raw["name"] self._avatar = f"https://huggingface.co.{raw["avatarUrl"]}" return self.name def ping(self, name) -> str: if(name == self.name): _utc = datetime.now(timezone.utc) return _utc.strftime('%Y-%m-%d') return "Unauthorized OP" def list_dataset(self, repo, path="data"): self.repo = repo _raw = self.hf_api.list_repo_files(repo, repo_type="dataset") if repo == "OpenVideo/Sample-2k": files = filter(lambda f: f.endswith(".mp4"), _raw) else: files = filter(lambda f: f.startswith(path + os.sep), _raw) return list(files) def fetch_file(self, fname, local_dir="/dev/shm"): os.environ["TOKIO_WORKER_THREADS"] = "8" os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" file = hf_hub_download(repo_id=self.repo, filename=fname, repo_type="dataset", local_dir=local_dir, token=self.token) return file def split_parquet(self, pq_path, batch_size=1): tasks = [] print(pq_path, batch_size) pf = pq.ParquetFile(pq_path) for batch in pf.iter_batches(batch_size): _chunk = [] df = batch.to_pandas() for binary in df["video"]: if(binary): _v = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) with open(_v.name, "wb") as f: _ = f.write(binary) _chunk.append(_v.name) tasks.append(_chunk) del df return tasks def fetch_file_slice(self, fname, batch_size): pq_path = self.fetch_file(self.repo, fname, local_dir="/tmp") file_slice = self.split_parquet(pq_path, batch_size) return file_slice def dump_video(self, binary): vt = tempfile.NamedTemporaryFile(suffix=".mp4") with open(vt.name, "wb") as f: _ = f.write(binary) return vt.name def bytes_md5(self, binary): md5_hash = hashlib.md5(binary).hexdigest() return md5_hash def tag(self, binary, tags): md5_hash = self.bytes_md5(binary) _info = { "md5": md5_hash, "tags": tags, "operator_name": self.name, } return _info def push_tag(self, info, target_repo): md5 = info["md5"] _raw = f"raw/{md5}/{md5}.mp4" idx_path = f"index/{md5}/{md5}.toml" with open(idx_path, "w") as f: toml.dump(info, f) _repo = upload_file( path_or_fileobj=idx_path, path_in_repo=idx_path, repo_id = target_repo, token=self.token, repo_type="dataset" ) return _repo if __name__ == "__main__": u = HFUser.from_token("hf_xxxxx") _patch = u.split_parquet("/opt/000000.parquet", batch_size=4)