mm3dtest / tools /deployment /mmdet3d_handler.py
giantmonkeyTC
2344
34d1f8b
# Copyright (c) OpenMMLab. All rights reserved.
import base64
import os
import numpy as np
import torch
from ts.torch_handler.base_handler import BaseHandler
from mmdet3d.apis import inference_detector, init_model
from mmdet3d.structures.points import get_points_type
class MMdet3dHandler(BaseHandler):
"""MMDetection3D Handler used in TorchServe.
Handler to load models in MMDetection3D, and it will process data to get
predicted results. For now, it only supports SECOND.
"""
threshold = 0.5
load_dim = 4
use_dim = [0, 1, 2, 3]
coord_type = 'LIDAR'
attribute_dims = None
def initialize(self, context):
"""Initialize function loads the model in MMDetection3D.
Args:
context (context): It is a JSON Object containing information
pertaining to the model artifacts parameters.
"""
properties = context.system_properties
self.map_location = 'cuda' if torch.cuda.is_available() else 'cpu'
self.device = torch.device(self.map_location + ':' +
str(properties.get('gpu_id')) if torch.cuda.
is_available() else self.map_location)
self.manifest = context.manifest
model_dir = properties.get('model_dir')
serialized_file = self.manifest['model']['serializedFile']
checkpoint = os.path.join(model_dir, serialized_file)
self.config_file = os.path.join(model_dir, 'config.py')
self.model = init_model(self.config_file, checkpoint, self.device)
self.initialized = True
def preprocess(self, data):
"""Preprocess function converts data into LiDARPoints class.
Args:
data (List): Input data from the request.
Returns:
`LiDARPoints` : The preprocess function returns the input
point cloud data as LiDARPoints class.
"""
for row in data:
# Compat layer: normally the envelope should just return the data
# directly, but older versions of Torchserve didn't have envelope.
pts = row.get('data') or row.get('body')
if isinstance(pts, str):
pts = base64.b64decode(pts)
points = np.frombuffer(pts, dtype=np.float32)
points = points.reshape(-1, self.load_dim)
points = points[:, self.use_dim]
points_class = get_points_type(self.coord_type)
points = points_class(
points,
points_dim=points.shape[-1],
attribute_dims=self.attribute_dims)
return points
def inference(self, data):
"""Inference Function.
This function is used to make a prediction call on the
given input request.
Args:
data (`LiDARPoints`): LiDARPoints class passed to make
the inference request.
Returns:
List(dict) : The predicted result is returned in this function.
"""
results, _ = inference_detector(self.model, data)
return results
def postprocess(self, data):
"""Postprocess function.
This function makes use of the output from the inference and
converts it into a torchserve supported response output.
Args:
data (List[dict]): The data received from the prediction
output of the model.
Returns:
List: The post process function returns a list of the predicted
output.
"""
output = []
for pts_index, result in enumerate(data):
output.append([])
if 'pts_bbox' in result.keys():
pred_bboxes = result['pts_bbox']['boxes_3d'].numpy()
pred_scores = result['pts_bbox']['scores_3d'].numpy()
else:
pred_bboxes = result['boxes_3d'].numpy()
pred_scores = result['scores_3d'].numpy()
index = pred_scores > self.threshold
bbox_coords = pred_bboxes[index].tolist()
score = pred_scores[index].tolist()
output[pts_index].append({'3dbbox': bbox_coords, 'score': score})
return output