Spaces:
Runtime error
Runtime error
File size: 4,128 Bytes
92b2313 8bd8a23 0dcde41 9ab6533 92b2313 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
from dataclasses import dataclass
from huggingface_hub import HfApi, hf_hub_download, upload_file
import pyarrow.parquet as pq
from datetime import datetime, timezone
import tempfile
import hashlib
import toml
import os
_ROOT = os.path.dirname(os.path.abspath(__file__))
ROOT = os.path.dirname(_ROOT)
GR_CONF = {
"title": "OpenVideo",
"css": f"{ROOT}/assets/design.css",
"js": f"{ROOT}/assets/design.js",
"theme": f"{ROOT}/assets/theme.json",
}
@dataclass
class HFUser:
token: str
name: str
avatar: str
hf_api: HfApi
repo: str = None
@classmethod
def from_token(cls, _token: str) -> "HFUser":
_api = HfApi(token=_token, endpoint="https://huggingface.co.")
raw = _api.whoami()
_name = raw["name"]
_avatar = f"https://huggingface.co.{raw["avatarUrl"]}"
return cls(token=_token, name=_name, avatar=_avatar, hf_api=_api)
@classmethod
def empty(cls) -> "HFUser":
return cls(token=None, name=None, avatar=None, hf_api=None)
def auth(self, _token: str) -> "HFUser":
_api = HfApi(token=_token, endpoint="https://huggingface.co.")
raw = _api.whoami()
self.token = _token
self.hf_api = _api
self.name = raw["name"]
self._avatar = f"https://huggingface.co.{raw["avatarUrl"]}"
return self.name
def ping(self, name) -> str:
if(name == self.name):
_utc = datetime.now(timezone.utc)
return _utc.strftime('%Y-%m-%d')
return "Unauthorized OP"
def list_dataset(self, repo, path="data"):
self.repo = repo
_raw = self.hf_api.list_repo_files(repo, repo_type="dataset")
if repo == "OpenVideo/Sample-2k":
files = filter(lambda f: f.endswith(".mp4"), _raw)
else:
files = filter(lambda f: f.startswith(path + os.sep), _raw)
return list(files)
def fetch_file(self, fname, local_dir="/dev/shm"):
os.environ["TOKIO_WORKER_THREADS"] = "8"
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
file = hf_hub_download(repo_id=self.repo, filename=fname, repo_type="dataset", local_dir=local_dir, token=self.token)
return file
def split_parquet(self, pq_path, batch_size=1):
tasks = []
print(pq_path, batch_size)
pf = pq.ParquetFile(pq_path)
for batch in pf.iter_batches(batch_size):
_chunk = []
df = batch.to_pandas()
for binary in df["video"]:
if(binary):
_v = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
with open(_v.name, "wb") as f:
_ = f.write(binary)
_chunk.append(_v.name)
tasks.append(_chunk)
del df
return tasks
def fetch_file_slice(self, fname, batch_size):
pq_path = self.fetch_file(self.repo, fname, local_dir="/tmp")
file_slice = self.split_parquet(pq_path, batch_size)
return file_slice
def dump_video(self, binary):
vt = tempfile.NamedTemporaryFile(suffix=".mp4")
with open(vt.name, "wb") as f:
_ = f.write(binary)
return vt.name
def bytes_md5(self, binary):
md5_hash = hashlib.md5(binary).hexdigest()
return md5_hash
def tag(self, binary, tags):
md5_hash = self.bytes_md5(binary)
_info = {
"md5": md5_hash,
"tags": tags,
"operator_name": self.name,
}
return _info
def push_tag(self, info, target_repo):
md5 = info["md5"]
_raw = f"raw/{md5}/{md5}.mp4"
idx_path = f"index/{md5}/{md5}.toml"
with open(idx_path, "w") as f:
toml.dump(info, f)
_repo = upload_file(
path_or_fileobj=idx_path,
path_in_repo=idx_path,
repo_id = target_repo,
token=self.token,
repo_type="dataset"
)
return _repo
if __name__ == "__main__":
u = HFUser.from_token("hf_xxxxx")
_patch = u.split_parquet("/opt/000000.parquet", batch_size=4) |