Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import string | |
# import platform | |
import time | |
import datetime | |
import json | |
# import numpy as np | |
# import threading | |
# import cv2 | |
# import PIL.Image as Image | |
# import ffmpeg | |
from io import BytesIO | |
# from queue import Queue | |
# import glob | |
import oss2 | |
import random | |
import requests | |
import shutil | |
# import torch | |
# import ctypes | |
use_internal_network = False | |
# OSSAccessKeyId = os.getenv('OSSAccessKeyId', "") | |
# OSSAccessKeySecret = os.getenv('OSSAccessKeySecret', "") | |
def get_random_string(): | |
now = datetime.datetime.now() | |
date = now.strftime('%Y%m%d') | |
time = now.strftime('%H%M%S') | |
microsecond = now.strftime('%f') | |
microsecond = microsecond[:6] # 取前6位,即微秒 | |
rand_num = ''.join([str(random.randint(0, 9)) for _ in range(6)]) | |
random_string = ''.join(random.choices(string.ascii_uppercase, k=6)) # ascii_lowercase | |
return date + "-" + time + "-" + microsecond + "-" + random_string | |
class ossService(): | |
def __init__(self, OSSAccessKeyId, OSSAccessKeySecret, Endpoint, BucketName, ObjectName): | |
self.AccessKeyId = OSSAccessKeyId | |
self.AccessKeySecret = OSSAccessKeySecret | |
self.Endpoint = Endpoint | |
self.BucketName = BucketName # "vigen-video" | |
self.ObjectName = ObjectName # "VideoGeneration" | |
self.Prefix = "oss://" + self.BucketName | |
auth = oss2.Auth(self.AccessKeyId, self.AccessKeySecret) | |
self.bucket = oss2.Bucket(auth, self.Endpoint, self.BucketName) | |
# oss_url: eg: oss://BucketName/ObjectName/xxx.mp4 | |
def sign(self, oss_url, timeout=86400): | |
try: | |
oss_path = oss_url[len("oss://" + self.BucketName + "/"):] | |
return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True) | |
except Exception as e: | |
print("sign error: {}".format(e)) | |
return 0, "" | |
def uploadOssFile(self, oss_full_path, local_full_path): | |
try: | |
self.bucket.put_object_from_file(oss_full_path, local_full_path) | |
return self.sign(self.Prefix+"/"+oss_full_path, timeout=86400) | |
except oss2.exceptions.OssError as e: | |
print("oss upload error: ", e) | |
return 0, "" | |
def downloadOssFile(self, oss_full_path, local_full_path): | |
status = 1 | |
try: | |
self.bucket.get_object_to_file(oss_full_path, local_full_path) | |
except oss2.exceptions.OssError as e: | |
print("oss download error: ", e) | |
status = 0 | |
return status | |
def downloadFile(self, file_full_url, local_full_path): | |
status = 1 | |
response = requests.get(file_full_url) | |
if response.status_code == 200: | |
with open(local_full_path, "wb") as f: | |
f.write(response.content) | |
else: | |
print("oss download error. ") | |
status = 0 | |
return status |