Spaces:
Runtime error
Runtime error
File size: 907 Bytes
c7a96cd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
import grpc
from google.rpc import status_pb2, code_pb2
from grpc_status import rpc_status
from grpc_interceptor.server import AsyncServerInterceptor
from loguru import logger
from typing import Callable, Any
class ExceptionInterceptor(AsyncServerInterceptor):
async def intercept(
self,
method: Callable,
request_or_iterator: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
try:
response = method(request_or_iterator, context)
return await response
except Exception as err:
method_name = method_name.split("/")[-1]
logger.exception(f"Method {method_name} encountered an error.")
await context.abort_with_status(
rpc_status.to_status(
status_pb2.Status(code=code_pb2.INTERNAL, message=str(err))
)
)
|