File size: 4,128 Bytes
92b2313
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8bd8a23
0dcde41
9ab6533
 
92b2313
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from dataclasses import dataclass
from huggingface_hub import HfApi, hf_hub_download, upload_file
import pyarrow.parquet as pq
from datetime import datetime, timezone
import tempfile
import hashlib
import toml
import os

_ROOT = os.path.dirname(os.path.abspath(__file__))
ROOT = os.path.dirname(_ROOT)

GR_CONF = {
    "title": "OpenVideo",
    "css": f"{ROOT}/assets/design.css",
    "js": f"{ROOT}/assets/design.js",
    "theme": f"{ROOT}/assets/theme.json",
}

@dataclass
class HFUser:
    token: str
    name: str
    avatar: str
    hf_api: HfApi

    repo: str = None

    @classmethod
    def from_token(cls, _token: str) -> "HFUser":
        _api = HfApi(token=_token, endpoint="https://huggingface.co.")
        raw = _api.whoami()
        _name = raw["name"]
        _avatar = f"https://huggingface.co.{raw["avatarUrl"]}"
        return cls(token=_token, name=_name, avatar=_avatar, hf_api=_api)

    @classmethod
    def empty(cls) -> "HFUser":
        return cls(token=None, name=None, avatar=None, hf_api=None)

    def auth(self, _token: str) -> "HFUser":
        _api = HfApi(token=_token, endpoint="https://huggingface.co.")
        raw = _api.whoami()

        self.token = _token
        self.hf_api = _api
        self.name = raw["name"]
        self._avatar = f"https://huggingface.co.{raw["avatarUrl"]}"
        return self.name

    def ping(self, name) -> str:
        if(name == self.name):
            _utc = datetime.now(timezone.utc)
            return _utc.strftime('%Y-%m-%d')
        return "Unauthorized OP"

    def list_dataset(self, repo, path="data"):
        self.repo = repo
        _raw = self.hf_api.list_repo_files(repo, repo_type="dataset")
        if repo == "OpenVideo/Sample-2k":
            files = filter(lambda f: f.endswith(".mp4"), _raw)
        else:
            files = filter(lambda f: f.startswith(path + os.sep), _raw)
        return list(files)

    def fetch_file(self, fname, local_dir="/dev/shm"):
        os.environ["TOKIO_WORKER_THREADS"] = "8"
        os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
        file = hf_hub_download(repo_id=self.repo, filename=fname, repo_type="dataset", local_dir=local_dir, token=self.token)
        return file

    def split_parquet(self, pq_path, batch_size=1):
        tasks = []
        print(pq_path, batch_size)
        pf = pq.ParquetFile(pq_path)
        for batch in pf.iter_batches(batch_size):
            _chunk = []
            df = batch.to_pandas()
            for binary in df["video"]:
                if(binary):
                    _v = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
                    with open(_v.name, "wb") as f:
                        _ = f.write(binary)
                    _chunk.append(_v.name)
            tasks.append(_chunk)
            del df
        return tasks

    def fetch_file_slice(self, fname, batch_size):
        pq_path = self.fetch_file(self.repo, fname, local_dir="/tmp")
        file_slice = self.split_parquet(pq_path, batch_size)
        return file_slice

    def dump_video(self, binary):
        vt = tempfile.NamedTemporaryFile(suffix=".mp4")
        with open(vt.name, "wb") as f:
            _ = f.write(binary)
        return vt.name
    
    def bytes_md5(self, binary):
        md5_hash = hashlib.md5(binary).hexdigest()
        return md5_hash

    def tag(self, binary, tags):
        md5_hash = self.bytes_md5(binary)
        _info = {
            "md5": md5_hash,
            "tags": tags,
            "operator_name": self.name,
        }
        return _info

    def push_tag(self, info, target_repo):
        md5 = info["md5"]
        _raw = f"raw/{md5}/{md5}.mp4"
        idx_path = f"index/{md5}/{md5}.toml"

        with open(idx_path, "w") as f:
            toml.dump(info, f)

        _repo = upload_file(
            path_or_fileobj=idx_path,
            path_in_repo=idx_path,
            repo_id = target_repo,
            token=self.token, 
            repo_type="dataset"
        )
        return _repo

if __name__ == "__main__":
    u = HFUser.from_token("hf_xxxxx")
    _patch = u.split_parquet("/opt/000000.parquet", batch_size=4)