bandhav's picture
Base code
e6a6383
raw
history blame
8.22 kB
"""
Test script to evaluate the model.
"""
import argparse
import importlib
import multiprocessing
import os, glob
import logging
import numpy as np
import torch
import pandas as pd
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriter
from torch.profiler import profile, record_function, ProfilerActivity
from tqdm import tqdm # pylint: disable=unused-import
from torchmetrics.functional import(
scale_invariant_signal_noise_ratio as si_snr,
signal_noise_ratio as snr,
signal_distortion_ratio as sdr,
scale_invariant_signal_distortion_ratio as si_sdr)
from src.helpers import utils
from src.training.synthetic_dataset import FSDSoundScapesDataset, tensorboard_add_metrics
from src.training.synthetic_dataset import tensorboard_add_sample
def test_epoch(model: nn.Module, device: torch.device,
test_loader: torch.utils.data.dataloader.DataLoader,
n_items: int, loss_fn, metrics_fn,
profiling: bool = False, epoch: int = 0,
writer: SummaryWriter = None, data_params = None) -> float:
"""
Evaluate the network.
"""
model.eval()
metrics = {}
with torch.no_grad():
for batch_idx, (mixed, label, gt) in \
enumerate(tqdm(test_loader, desc='Test', ncols=100)):
mixed = mixed.to(device)
label = label.to(device)
gt = gt.to(device)
# Run through the model
with profile(activities=[ProfilerActivity.CPU],
record_shapes=True) as prof:
with record_function("model_inference"):
output = model(mixed, label)
if profiling:
logging.info(
prof.key_averages().table(sort_by="self_cpu_time_total",
row_limit=20))
# Compute loss
loss = loss_fn(output, gt)
# Compute metrics
metrics_batch = metrics_fn(mixed, output, gt)
metrics_batch['loss'] = [loss.item()]
metrics_batch['runtime'] = [prof.profiler.self_cpu_time_total/1000]
for k in metrics_batch.keys():
if not k in metrics:
metrics[k] = metrics_batch[k]
else:
metrics[k] += metrics_batch[k]
if writer is not None:
if batch_idx == 0:
tensorboard_add_sample(
writer, tag='Test',
sample=(mixed[:8], label[:8], gt[:8], output[:8]),
step=epoch, params=data_params)
tensorboard_add_metrics(
writer, tag='Test', metrics=metrics_batch, label=label,
step=epoch)
if n_items is not None and batch_idx == (n_items - 1):
break
avg_metrics = {k: np.mean(metrics[k]) for k in metrics.keys()}
avg_metrics_str = "Test:"
for m in avg_metrics.keys():
avg_metrics_str += ' %s=%.04f' % (m, avg_metrics[m])
logging.info(avg_metrics_str)
return avg_metrics
def evaluate(network, args: argparse.Namespace):
"""
Evaluate the model on a given dataset.
"""
# Load dataset
data_test = FSDSoundScapesDataset(**args.test_data)
logging.info("Loaded test dataset at %s containing %d elements" %
(args.test_data['input_dir'], len(data_test)))
# Set up the device and workers.
use_cuda = args.use_cuda and torch.cuda.is_available()
if use_cuda:
gpu_ids = args.gpu_ids if args.gpu_ids is not None\
else range(torch.cuda.device_count())
device_ids = [_ for _ in gpu_ids]
data_parallel = len(device_ids) > 1
device = 'cuda:%d' % device_ids[0]
torch.cuda.set_device(device_ids[0])
logging.info("Using CUDA devices: %s" % str(device_ids))
else:
data_parallel = False
device = torch.device('cpu')
logging.info("Using device: CPU")
# Set multiprocessing params
num_workers = min(multiprocessing.cpu_count(), args.n_workers)
kwargs = {
'num_workers': num_workers,
'pin_memory': True
} if use_cuda else {}
# Set up data loader
test_loader = torch.utils.data.DataLoader(data_test,
batch_size=args.eval_batch_size,
**kwargs)
# Set up model
model = network.Net(**args.model_params)
if use_cuda and data_parallel:
model = nn.DataParallel(model, device_ids=device_ids)
logging.info("Using data parallel model")
model.to(device)
# Load weights
if args.pretrain_path == "best":
ckpts = glob.glob(os.path.join(args.exp_dir, '*.pt'))
ckpts.sort(
key=lambda _: int(os.path.splitext(os.path.basename(_))[0]))
val_metrics = torch.load(ckpts[-1])['val_metrics'][args.base_metric]
best_epoch = max(range(len(val_metrics)), key=val_metrics.__getitem__)
args.pretrain_path = os.path.join(args.exp_dir, '%d.pt' % best_epoch)
logging.info(
"Found 'best' validation %s=%.02f at %s" %
(args.base_metric, val_metrics[best_epoch], args.pretrain_path))
if args.pretrain_path != "":
utils.load_checkpoint(
args.pretrain_path, model, data_parallel=data_parallel)
logging.info("Loaded pretrain weights from %s" % args.pretrain_path)
# Evaluate
try:
return test_epoch(
model, device, test_loader, args.n_items, network.loss,
network.metrics, args.profiling)
except KeyboardInterrupt:
print("Interrupted")
except Exception as _: # pylint: disable=broad-except
import traceback # pylint: disable=import-outside-toplevel
traceback.print_exc()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# Data Params
parser.add_argument('experiments', nargs='+', type=str,
default=None,
help="List of experiments to evaluate. "
"Provide only one experiment when providing "
"pretrained path. If pretrianed path is not "
"provided, epoch with best validation metric "
"is used for evaluation.")
parser.add_argument('--results', type=str, default="",
help="Path to the CSV file to store results.")
# System params
parser.add_argument('--n_items', type=int, default=None,
help="Number of items to test.")
parser.add_argument('--pretrain_path', type=str, default="best",
help="Path to pretrained weights")
parser.add_argument('--profiling', dest='profiling', action='store_true',
help="Enable or disable profiling.")
parser.add_argument('--use_cuda', dest='use_cuda', action='store_true',
help="Whether to use cuda")
parser.add_argument('--gpu_ids', nargs='+', type=int, default=None,
help="List of GPU ids used for training. "
"Eg., --gpu_ids 2 4. All GPUs are used by default.")
args = parser.parse_args()
results = []
for exp_dir in args.experiments:
eval_args = argparse.Namespace(**vars(args))
eval_args.exp_dir = exp_dir
utils.set_logger(os.path.join(exp_dir, 'eval.log'))
logging.info("Evaluating %s ..." % exp_dir)
# Load model and training params
params = utils.Params(os.path.join(exp_dir, 'config.json'))
for k, v in params.__dict__.items():
vars(eval_args)[k] = v
network = importlib.import_module(eval_args.model)
logging.info("Imported the model from '%s'." % eval_args.model)
curr_res = evaluate(network, eval_args)
curr_res['experiment'] = os.path.basename(exp_dir)
results.append(curr_res)
del eval_args
if args.results != "":
print("Writing results to %s" % args.results)
pd.DataFrame(results).to_csv(args.results, index=False)