Spaces:
Runtime error
Runtime error
File size: 10,368 Bytes
890de26 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
# -*- coding:utf-8 -*-
# @FileName :logger.py
# @Time :2023/8/8 20:17
# @Author :lovemefan
# @Email :[email protected]
"""LOGGER Module"""
import logging
import logging.config
import logging.handlers
import os
import sys
from functools import wraps
from typing import List, Tuple, Union
logger_list = []
LEVEL = ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")
_LOG_FILE_DIR = "paraformer_logs/"
LOCAL_DEFAULT_LOG_FILE_DIR = os.path.join(
os.getenv("LOCAL_DEFAULT_PATH", _LOG_FILE_DIR), "log"
)
DEFAULT_FILEHANDLER_FORMAT = (
"[%(levelname)s] %(asctime)s " "[%(pathname)s:%(lineno)d] %(funcName)s: %(message)s"
)
DEFAULT_STDOUT_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
DEFAULT_REDIRECT_FILE_NAME = "paraformer-online.log"
class StreamRedirector:
"""Stream Re-director for Log."""
def __init__(self, source_stream, target_stream):
"""Redirects the source stream to the target stream.
Args:
source_stream: Source stream.
target_stream: Target stream.
"""
super(StreamRedirector, self).__init__()
self.source_stream = source_stream
self.target_stream = target_stream
self.save_source_stream_fd = os.dup(self.source_stream.fileno())
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
self.start()
func(*args, **kwargs)
self.stop()
return wrapper
def __enter__(self):
self.start()
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def start(self):
"""start."""
self.source_stream.flush()
os.dup2(self.target_stream.fileno(), self.source_stream.fileno())
def stop(self):
"""stop."""
self.source_stream.flush()
os.dup2(self.save_source_stream_fd, self.source_stream.fileno())
self.target_stream.flush()
def validate_nodes_devices_input(var_name: str, var):
"""Check the list of nodes or devices.
Args:
var_name (str): Variable name.
var: The name of the variable to be checked.
Returns:
None
"""
if not (var is None or isinstance(var, (list, tuple, dict))):
raise TypeError(
"The value of {} can be None or a value of type tuple, "
"list, or dict.".format(var_name)
)
if isinstance(var, (list, tuple)):
for item in var:
if not isinstance(item, int):
raise TypeError(
"The elements of a variable of type list or "
"tuple must be of type int."
)
def validate_level(var_name: str, var):
"""Verify that the log level is correct.
Args:
var_name (str): Variable name.
var: The name of variable to be checked.
Returns:
None
"""
if not isinstance(var, str):
raise TypeError("The format of {} must be of type str.".format(var_name))
if var not in LEVEL:
raise ValueError("{}={} needs to be in {}".format(var_name, var, LEVEL))
def validate_std_input_format(
to_std: bool,
stdout_nodes: Union[List, Tuple, None],
stdout_devices: Union[List, Tuple, None],
stdout_level: str,
):
"""Validate the input about stdout of the get_logger function."""
if not isinstance(to_std, bool):
raise TypeError("The format of the to_std must be of type bool.")
validate_nodes_devices_input("stdout_nodes", stdout_nodes)
validate_nodes_devices_input("stdout_devices", stdout_devices)
validate_level("stdout_level", stdout_level)
def validate_file_input_format(
file_level: Union[List[str], Tuple[str]],
file_save_dir: str,
append_rank_dir: str,
file_name: Union[List[str], Tuple[str]],
):
"""Validate the input about file of the get_logger function."""
if not isinstance(file_level, (tuple, list)):
raise TypeError("The value of file_level should be list or a tuple.")
for level in file_level:
validate_level("level in file_level", level)
if not len(file_level) == len(file_name):
raise ValueError("The length of file_level and file_name should be equal.")
if not isinstance(file_save_dir, str):
raise TypeError("The value of file_save_dir should be a value of type str.")
if not isinstance(append_rank_dir, bool):
raise TypeError("The value of append_rank_dir should be a value of type bool.")
if not isinstance(file_name, (tuple, list)):
raise TypeError("The value of file_name should be list or a tuple.")
for name in file_name:
if not isinstance(name, str):
raise TypeError(
"The value of name in file_name should be a value of type str."
)
def _convert_level(level: str) -> int:
"""Convert the format of the log to logging level.
Args:
level (str): User log level.
Returns:
level (str): Logging level.
"""
level_convert = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
level = level_convert.get(level, logging.INFO)
return level
def get_logger(logger_name: str = "paraformer", **kwargs) -> logging.Logger:
"""Get the logger. Both computing centers and bare metal servers are
available.
Args:
logger_name (str): Logger name.
kwargs (dict): Other input.
to_std (bool): If set to True, output the log to stdout.
stdout_level (str): The level of the log output to stdout.
If the type is str, the options are DEBUG, INFO, WARNING, ERROR, CRITICAL.
stdout_format (str): Log format.
file_level (list[str] or tuple[str]): The level of the log output to file.
eg: ['INFO', 'ERROR'] Indicates that the logger will output logs above
the level INFO and ERROR in the list to the corresponding file.
The length of the list needs to be the same as the length of file_name.
file_save_dir (str): The folder where the log files are stored.
append_rank_dir (bool): Whether to add a folder with the format rank{}.
file_name (list[str] or list[tuple]): Store a list of output file names.
max_file_size (int): The maximum size of a single log file. Unit: MB.
max_num_of_files (int): The maximum number of files to save.
Returns:
logger (logging.Logger): Logger.
"""
mf_logger = logging.getLogger(logger_name)
if logger_name in logger_list:
return mf_logger
to_std = kwargs.get("to_std", True)
stdout_nodes = kwargs.get("stdout_nodes", None)
def get_stdout_devices():
if os.getenv("STDOUT_DEVICES"):
devices = os.getenv("STDOUT_DEVICES")
if devices.startswith(("(", "[")) and devices.endswith((")", "]")):
devices = devices[1:-1]
devices = tuple(map(lambda x: int(x.strip()), devices.split(",")))
else:
devices = kwargs.get("stdout_devices", None)
return devices
stdout_devices = get_stdout_devices()
stdout_level = kwargs.get("stdout_level", "INFO")
stdout_format = kwargs.get("stdout_format", "")
file_level = kwargs.get("file_level", ("INFO", "ERROR"))
file_save_dir = kwargs.get("file_save_dir", "")
append_rank_dir = kwargs.get("append_rank_dir", True)
file_name = kwargs.get("file_name", ("info.log", "error.log"))
max_file_size = kwargs.get("max_file_size", 50)
max_num_of_files = kwargs.get("max_num_of_files", 5)
validate_std_input_format(to_std, stdout_nodes, stdout_devices, stdout_level)
validate_file_input_format(file_level, file_save_dir, append_rank_dir, file_name)
if to_std:
if not stdout_format:
stdout_format = DEFAULT_STDOUT_FORMAT
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(_convert_level(stdout_level))
stream_formatter = logging.Formatter(stdout_format)
stream_handler.setFormatter(stream_formatter)
mf_logger.addHandler(stream_handler)
logging_level = []
for level in file_level:
logging_level.append(_convert_level(level))
if not file_save_dir:
file_save_dir = LOCAL_DEFAULT_LOG_FILE_DIR
file_path = []
for name in file_name:
path = os.path.join(file_save_dir, name)
path = os.path.realpath(path)
base_dir = os.path.dirname(path)
if not os.path.exists(base_dir):
os.makedirs(base_dir, exist_ok=True)
file_path.append(path)
max_file_size = max_file_size * 1024 * 1024
file_formatter = logging.Formatter(DEFAULT_FILEHANDLER_FORMAT)
for i, level in enumerate(file_level):
file_handler = logging.handlers.RotatingFileHandler(
filename=file_path[i], maxBytes=max_file_size, backupCount=max_num_of_files
)
file_handler.setLevel(level)
file_handler.setFormatter(file_formatter)
mf_logger.addHandler(file_handler)
mf_logger.setLevel(_convert_level("INFO"))
mf_logger.propagate = False
logger_list.append(logger_name)
return mf_logger
class _LogActionOnce:
"""
A wrapper for modify the warning logging to an empty function. This is used when we want to only log
once to avoid the repeated logging.
Args:
logger (logging): The logger object.
"""
is_logged = dict()
def __init__(self, m_logger, key, no_warning=False):
self.logger = m_logger
self.key = key
self.no_warning = no_warning
def __call__(self, func):
def wrapper(*args, **kwargs):
if not hasattr(self.logger, "warning"):
return func(*args, **kwargs)
old_func = self.logger.warning
if self.no_warning or self.key in _LogActionOnce.is_logged:
self.logger.warning = lambda x: x
else:
_LogActionOnce.is_logged[self.key] = True
res = func(*args, **kwargs)
if hasattr(self.logger, "warning"):
self.logger.warning = old_func
return res
return wrapper
logger = get_logger(stdout_level="DEBUG")
|