Spaces:
Sleeping
Sleeping
from collections import deque | |
from typing import Dict, List | |
from speakers.server.model.flow_data import BaseFlowData | |
class Bootstrap: | |
"""Used by web module to decide which secret for securing""" | |
_NONCE: str = '' | |
"""最大的任务队列""" | |
_MAX_ONGOING_TASKS: int = 1 | |
"""任务队列""" | |
_QUEUE: deque = deque() | |
"""进行的任务数据""" | |
_TASK_DATA: Dict[str, BaseFlowData] = {} | |
"""进行的任务状态""" | |
_TASK_STATES = {} | |
"""正在进行的任务""" | |
_ONGOING_TASKS: List[str] = [] | |
def __init__(self): | |
self._version = "v0.0.1" | |
def from_config(cls, cfg=None): | |
return cls() | |
def version(self): | |
return self._version | |
def max_ongoing_tasks(self) -> int: | |
return self._MAX_ONGOING_TASKS | |
def ongoing_tasks(self) -> List[str]: | |
return self._ONGOING_TASKS | |
def queue(self) -> deque: | |
return self._QUEUE | |
def task_data(self) -> Dict[str, BaseFlowData]: | |
return self._TASK_DATA | |
def task_states(self) -> dict: | |
return self._TASK_STATES | |
def nonce(self) -> str: | |
return self._NONCE | |
def set_nonce(self, nonce: str): | |
self._NONCE = nonce | |
async def run(cls): | |
raise NotImplementedError | |