Spaces:
Sleeping
Sleeping
File size: 3,767 Bytes
8f809e2 3573a39 136af2d 3573a39 3a0ee14 136af2d 3573a39 3a0ee14 3573a39 136af2d 3573a39 3a0ee14 136af2d 3a0ee14 136af2d 3a0ee14 9e4233f 136af2d 3a0ee14 3573a39 3a0ee14 136af2d 3a0ee14 be473e6 136af2d 3a0ee14 3573a39 3a0ee14 136af2d 3a0ee14 136af2d 3a0ee14 9e4233f 136af2d 3a0ee14 3573a39 3a0ee14 136af2d 3a0ee14 3573a39 3a0ee14 3573a39 be473e6 136af2d 3573a39 9e4233f 136af2d 9e4233f 136af2d 9e4233f 8f809e2 136af2d 9e4233f 3573a39 9e4233f 136af2d 9e4233f 136af2d 8f809e2 9e4233f 136af2d 9e4233f 136af2d 9e4233f 3573a39 3a0ee14 9e4233f 8f809e2 3573a39 8f809e2 3573a39 8f809e2 3573a39 8f809e2 136af2d 8f809e2 136af2d 8f809e2 136af2d 041cafd 136af2d 8f809e2 041cafd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
import os
import subprocess
import pipe
import yaml
YAML_PATH = "./configs"
class Dumper(yaml.Dumper):
def increase_indent(self, flow=False, *args, **kwargs):
return super().increase_indent(flow=flow, indentless=False)
def get_yaml_path(uid):
if not os.path.exists(YAML_PATH):
os.makedirs(YAML_PATH)
if not os.path.exists(f"{YAML_PATH}/{uid}_config.yaml"):
os.system(f"cp {YAML_PATH}/config.yaml {YAML_PATH}/{uid}_config.yaml")
return f"{YAML_PATH}/{uid}_config.yaml"
# read scanners from yaml file
# return a list of scanners
def read_scanners(uid):
scanners = []
with open(get_yaml_path(uid), "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
scanners = config.get("detectors", [])
f.close()
return scanners
# convert a list of scanners to yaml file
def write_scanners(scanners, uid):
with open(get_yaml_path(uid), "r+") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if config:
config["detectors"] = scanners
# save scanners to detectors in yaml
yaml.dump(config, f, Dumper=Dumper)
f.close()
# read model_type from yaml file
def read_inference_type(uid):
inference_type = ""
with open(get_yaml_path(uid), "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
inference_type = config.get("inference_type", "")
f.close()
return inference_type
# write model_type to yaml file
def write_inference_type(use_inference, uid):
with open(get_yaml_path(uid), "r+") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if use_inference:
config["inference_type"] = "hf_inference_api"
else:
config["inference_type"] = "hf_pipeline"
# save inference_type to inference_type in yaml
yaml.dump(config, f, Dumper=Dumper)
f.close()
# read column mapping from yaml file
def read_column_mapping(uid):
column_mapping = {}
with open(get_yaml_path(uid), "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if config:
column_mapping = config.get("column_mapping", dict())
f.close()
return column_mapping
# write column mapping to yaml file
def write_column_mapping(mapping, uid):
with open(get_yaml_path(uid), "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
f.close()
if config is None:
return
if mapping is None and "column_mapping" in config.keys():
del config["column_mapping"]
else:
config["column_mapping"] = mapping
with open(get_yaml_path(uid), "w") as f:
# save column_mapping to column_mapping in yaml
yaml.dump(config, f, Dumper=Dumper)
f.close()
# convert column mapping dataframe to json
def convert_column_mapping_to_json(df, label=""):
column_mapping = {}
column_mapping[label] = []
for _, row in df.iterrows():
column_mapping[label].append(row.tolist())
return column_mapping
def get_logs_file(uid):
try:
file = open(f"./tmp/{uid}_log", "r")
return file.read()
except Exception:
return "Log file does not exist"
def write_log_to_user_file(id, log):
with open(f"./tmp/{id}_log", "a") as f:
f.write(log)
def save_job_to_pipe(id, job, lock):
with lock:
pipe.jobs.append((id, job))
def pop_job_from_pipe():
if len(pipe.jobs) == 0:
return
job_info = pipe.jobs.pop()
write_log_to_user_file(job_info[0], f"Running job id {job_info[0]}\n")
command = job_info[1]
log_file = open(f"./tmp/{job_info[0]}_log", "a")
subprocess.Popen(
command,
cwd=os.path.join(os.path.dirname(os.path.realpath(__file__)), "cicd"),
stdout=log_file,
stderr=log_file,
)
|