Spaces:
Sleeping
Sleeping
File size: 1,388 Bytes
04ffec9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
from collections import deque
from typing import Dict, List
from speakers.server.model.flow_data import BaseFlowData
class Bootstrap:
"""Used by web module to decide which secret for securing"""
_NONCE: str = ''
"""最大的任务队列"""
_MAX_ONGOING_TASKS: int = 1
"""任务队列"""
_QUEUE: deque = deque()
"""进行的任务数据"""
_TASK_DATA: Dict[str, BaseFlowData] = {}
"""进行的任务状态"""
_TASK_STATES = {}
"""正在进行的任务"""
_ONGOING_TASKS: List[str] = []
def __init__(self):
self._version = "v0.0.1"
@classmethod
def from_config(cls, cfg=None):
return cls()
@property
def version(self):
return self._version
@property
def max_ongoing_tasks(self) -> int:
return self._MAX_ONGOING_TASKS
@property
def ongoing_tasks(self) -> List[str]:
return self._ONGOING_TASKS
@property
def queue(self) -> deque:
return self._QUEUE
@property
def task_data(self) -> Dict[str, BaseFlowData]:
return self._TASK_DATA
@property
def task_states(self) -> dict:
return self._TASK_STATES
@property
def nonce(self) -> str:
return self._NONCE
def set_nonce(self, nonce: str):
self._NONCE = nonce
@classmethod
async def run(cls):
raise NotImplementedError
|