dailingx's picture
Update huggingface/user.py
9ab6533 verified
from dataclasses import dataclass
from huggingface_hub import HfApi, hf_hub_download, upload_file
import pyarrow.parquet as pq
from datetime import datetime, timezone
import tempfile
import hashlib
import toml
import os
_ROOT = os.path.dirname(os.path.abspath(__file__))
ROOT = os.path.dirname(_ROOT)
GR_CONF = {
"title": "OpenVideo",
"css": f"{ROOT}/assets/design.css",
"js": f"{ROOT}/assets/design.js",
"theme": f"{ROOT}/assets/theme.json",
}
@dataclass
class HFUser:
token: str
name: str
avatar: str
hf_api: HfApi
repo: str = None
@classmethod
def from_token(cls, _token: str) -> "HFUser":
_api = HfApi(token=_token, endpoint="https://huggingface.co.")
raw = _api.whoami()
_name = raw["name"]
_avatar = f"https://huggingface.co.{raw["avatarUrl"]}"
return cls(token=_token, name=_name, avatar=_avatar, hf_api=_api)
@classmethod
def empty(cls) -> "HFUser":
return cls(token=None, name=None, avatar=None, hf_api=None)
def auth(self, _token: str) -> "HFUser":
_api = HfApi(token=_token, endpoint="https://huggingface.co.")
raw = _api.whoami()
self.token = _token
self.hf_api = _api
self.name = raw["name"]
self._avatar = f"https://huggingface.co.{raw["avatarUrl"]}"
return self.name
def ping(self, name) -> str:
if(name == self.name):
_utc = datetime.now(timezone.utc)
return _utc.strftime('%Y-%m-%d')
return "Unauthorized OP"
def list_dataset(self, repo, path="data"):
self.repo = repo
_raw = self.hf_api.list_repo_files(repo, repo_type="dataset")
if repo == "OpenVideo/Sample-2k":
files = filter(lambda f: f.endswith(".mp4"), _raw)
else:
files = filter(lambda f: f.startswith(path + os.sep), _raw)
return list(files)
def fetch_file(self, fname, local_dir="/dev/shm"):
os.environ["TOKIO_WORKER_THREADS"] = "8"
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
file = hf_hub_download(repo_id=self.repo, filename=fname, repo_type="dataset", local_dir=local_dir, token=self.token)
return file
def split_parquet(self, pq_path, batch_size=1):
tasks = []
print(pq_path, batch_size)
pf = pq.ParquetFile(pq_path)
for batch in pf.iter_batches(batch_size):
_chunk = []
df = batch.to_pandas()
for binary in df["video"]:
if(binary):
_v = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
with open(_v.name, "wb") as f:
_ = f.write(binary)
_chunk.append(_v.name)
tasks.append(_chunk)
del df
return tasks
def fetch_file_slice(self, fname, batch_size):
pq_path = self.fetch_file(self.repo, fname, local_dir="/tmp")
file_slice = self.split_parquet(pq_path, batch_size)
return file_slice
def dump_video(self, binary):
vt = tempfile.NamedTemporaryFile(suffix=".mp4")
with open(vt.name, "wb") as f:
_ = f.write(binary)
return vt.name
def bytes_md5(self, binary):
md5_hash = hashlib.md5(binary).hexdigest()
return md5_hash
def tag(self, binary, tags):
md5_hash = self.bytes_md5(binary)
_info = {
"md5": md5_hash,
"tags": tags,
"operator_name": self.name,
}
return _info
def push_tag(self, info, target_repo):
md5 = info["md5"]
_raw = f"raw/{md5}/{md5}.mp4"
idx_path = f"index/{md5}/{md5}.toml"
with open(idx_path, "w") as f:
toml.dump(info, f)
_repo = upload_file(
path_or_fileobj=idx_path,
path_in_repo=idx_path,
repo_id = target_repo,
token=self.token,
repo_type="dataset"
)
return _repo
if __name__ == "__main__":
u = HFUser.from_token("hf_xxxxx")
_patch = u.split_parquet("/opt/000000.parquet", batch_size=4)