Spaces:
Runtime error
Runtime error
import grpc | |
from opentelemetry import trace | |
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter | |
from opentelemetry.instrumentation.grpc._aio_server import ( | |
OpenTelemetryAioServerInterceptor, | |
) | |
from opentelemetry.semconv.trace import SpanAttributes | |
from opentelemetry.sdk.resources import Resource | |
from opentelemetry.sdk.trace import TracerProvider | |
from opentelemetry.sdk.trace.export import ( | |
BatchSpanProcessor, | |
) | |
class UDSOpenTelemetryAioServerInterceptor(OpenTelemetryAioServerInterceptor): | |
def __init__(self): | |
super().__init__(trace.get_tracer(__name__)) | |
def _start_span(self, handler_call_details, context, set_status_on_exception=False): | |
""" | |
Rewrite _start_span method to support Unix Domain Socket gRPC contexts | |
""" | |
# standard attributes | |
attributes = { | |
SpanAttributes.RPC_SYSTEM: "grpc", | |
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], | |
} | |
# if we have details about the call, split into service and method | |
if handler_call_details.method: | |
service, method = handler_call_details.method.lstrip("/").split("/", 1) | |
attributes.update( | |
{ | |
SpanAttributes.RPC_METHOD: method, | |
SpanAttributes.RPC_SERVICE: service, | |
} | |
) | |
# add some attributes from the metadata | |
metadata = dict(context.invocation_metadata()) | |
if "user-agent" in metadata: | |
attributes["rpc.user_agent"] = metadata["user-agent"] | |
# We use gRPC over a UNIX socket | |
attributes.update({SpanAttributes.NET_TRANSPORT: "unix"}) | |
return self._tracer.start_as_current_span( | |
name=handler_call_details.method, | |
kind=trace.SpanKind.SERVER, | |
attributes=attributes, | |
set_status_on_exception=set_status_on_exception, | |
) | |
def setup_tracing(shard: int, otlp_endpoint: str): | |
resource = Resource.create( | |
attributes={"service.name": f"text-generation-inference.server-{shard}"} | |
) | |
span_exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True) | |
span_processor = BatchSpanProcessor(span_exporter) | |
trace.set_tracer_provider(TracerProvider(resource=resource)) | |
trace.get_tracer_provider().add_span_processor(span_processor) | |