Spaces:
Sleeping
Sleeping
File size: 3,992 Bytes
8f809e2 3573a39 58c39e0 3573a39 3a0ee14 a89f9d8 3f85daf 3573a39 a89f9d8 3a0ee14 3573a39 a89f9d8 136af2d e95a647 136af2d 3573a39 a89f9d8 3a0ee14 136af2d 3a0ee14 136af2d 3a0ee14 9e4233f 8092547 3a0ee14 3573a39 3a0ee14 136af2d 1c00552 3a0ee14 be473e6 8092547 1c00552 8092547 3a0ee14 136af2d 3a0ee14 136af2d 3a0ee14 9e4233f 8092547 3a0ee14 3573a39 3a0ee14 1c00552 3a0ee14 a89f9d8 1c00552 a89f9d8 8dc0e4d 8092547 1c00552 a89f9d8 8092547 1c00552 a89f9d8 9e4233f 136af2d 9e4233f 136af2d 9e4233f 8f809e2 8092547 9e4233f 3573a39 9e4233f 136af2d 9e4233f 8092547 1c00552 8f809e2 9e4233f 1c00552 136af2d 9e4233f 8092547 9e4233f 3a0ee14 9e4233f 8f809e2 3573a39 8f809e2 e84a865 8f809e2 3573a39 8f809e2 e84a865 8f809e2 8092547 8f809e2 3573a39 affded7 8f809e2 affded7 8f809e2 a89f9d8 8f809e2 136af2d 8f809e2 136af2d affded7 041cafd 136af2d 8f809e2 e84a865 8f809e2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
import os
import subprocess
import yaml
import pipe
YAML_PATH = "./cicd/configs"
class Dumper(yaml.Dumper):
def increase_indent(self, flow=False, *args, **kwargs):
return super().increase_indent(flow=flow, indentless=False)
def get_yaml_path(uid):
if not os.path.exists(YAML_PATH):
os.makedirs(YAML_PATH)
if not os.path.exists(f"{YAML_PATH}/{uid}_config.yaml"):
os.system(f"cp config.yaml {YAML_PATH}/{uid}_config.yaml")
return f"{YAML_PATH}/{uid}_config.yaml"
# read scanners from yaml file
# return a list of scanners
def read_scanners(uid):
scanners = []
with open(get_yaml_path(uid), "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
scanners = config.get("detectors", [])
f.close()
return scanners
# convert a list of scanners to yaml file
def write_scanners(scanners, uid):
with open(get_yaml_path(uid), "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if config:
config["detectors"] = scanners
f.close()
# save scanners to detectors in yaml
with open(get_yaml_path(uid), "w") as f:
yaml.dump(config, f, Dumper=Dumper)
f.close()
# read model_type from yaml file
def read_inference_type(uid):
inference_type = ""
with open(get_yaml_path(uid), "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
inference_type = config.get("inference_type", "")
f.close()
return inference_type
# write model_type to yaml file
def write_inference_type(use_inference, inference_token, uid):
with open(get_yaml_path(uid), "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if use_inference:
config["inference_type"] = "hf_inference_api"
config["inference_token"] = inference_token
else:
config["inference_type"] = "hf_pipeline"
# FIXME: A quick and temp fix for missing token
config["inference_token"] = ""
f.close()
# save inference_type to inference_type in yaml
with open(get_yaml_path(uid), "w") as f:
yaml.dump(config, f, Dumper=Dumper)
f.close()
# read column mapping from yaml file
def read_column_mapping(uid):
column_mapping = {}
with open(get_yaml_path(uid), "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if config:
column_mapping = config.get("column_mapping", dict())
f.close()
return column_mapping
# write column mapping to yaml file
def write_column_mapping(mapping, uid):
with open(get_yaml_path(uid), "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
f.close()
if config is None:
return
if mapping is None and "column_mapping" in config.keys():
del config["column_mapping"]
else:
config["column_mapping"] = mapping
with open(get_yaml_path(uid), "w") as f:
yaml.dump(config, f, Dumper=Dumper)
f.close()
# convert column mapping dataframe to json
def convert_column_mapping_to_json(df, label=""):
column_mapping = {}
column_mapping[label] = []
for _, row in df.iterrows():
column_mapping[label].append(row.tolist())
return column_mapping
def get_logs_file(uid):
try:
file = open(f"./tmp/temp_log", "r")
return file.read()
except Exception:
return "Log file does not exist"
def write_log_to_user_file(id, log):
with open(f"./tmp/temp_log", "a") as f:
f.write(log)
f.close()
def save_job_to_pipe(id, job, description="", lock):
with lock:
pipe.jobs.append((id, job, description))
def pop_job_from_pipe():
if len(pipe.jobs) == 0:
return
job_info = pipe.jobs.pop()
pipe.current = job_info[2]
write_log_to_user_file(job_info[0], f"Running job id {job_info[0]}\n")
command = job_info[1]
log_file = open(f"./tmp/temp_log", "a")
subprocess.Popen(
command,
stdout=log_file,
stderr=log_file,
)
|