From a979c73563c182548bd333be6afd5a945a37a383 Mon Sep 17 00:00:00 2001 From: William Conti <58711692+wconti27@users.noreply.github.com> Date: Fri, 17 May 2024 14:02:34 -0400 Subject: [PATCH] fix(grpc): fix segfault with grpc.aio streaming responses [backport 1.20] (#9276) Backport https://github.com/DataDog/dd-trace-py/commit/5897cabe7651f30b0f865ed211c84e1316b49628 from https://github.com/DataDog/dd-trace-py/pull/9233 to 1.20. This PR fixes a few issues with the grpc aio integration. Most notably, the integration was causing segfaults when wrapping async stream responses, most likely since these spans were never being finished. This issue was uncovered when customers upgraded their google-api-core dependencies to 2.17.0; with this upgrade, the package changed many grpc calls to use async streaming. In addition to fixing the segfault, this PR also fixes the Pin object to be correctly placed on the grpcio module. Fixes https://github.com/DataDog/dd-trace-py/issues/9139 ## Checklist - [x] Change(s) are motivated and described in the PR description - [x] Testing strategy is described if automated tests are not included in the PR - [x] Risks are described (performance impact, potential for breakage, maintainability) - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed or label `changelog/no-changelog` is set - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)) - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] If this PR changes the public interface, I've notified `@DataDog/apm-tees`. ## Reviewer Checklist - [x] Title is accurate - [x] All changes are related to the pull request's stated goal - [x] Description motivates each change - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - [x] Testing strategy adequately addresses listed risks - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] Release note makes sense to a user of the library - [x] Author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Emmett Butler <723615+emmettbutler@users.noreply.github.com> --- .../contrib/grpc/aio_client_interceptor.py | 64 +++++- ddtrace/contrib/grpc/patch.py | 7 +- ddtrace/contrib/grpc/utils.py | 28 +++ docs/spelling_wordlist.txt | 1 + ...gle-api-core-upgrade-abbc097a46b5d032.yaml | 4 + .../grpc_aio/hellostreamingworld_pb2.py | 36 +++ .../grpc_aio/hellostreamingworld_pb2.pyi | 23 ++ .../grpc_aio/hellostreamingworld_pb2_grpc.py | 77 +++++++ tests/contrib/grpc_aio/test_grpc_aio.py | 210 +++++++++++++++++- 9 files changed, 425 insertions(+), 25 deletions(-) create mode 100644 releasenotes/notes/fix-grpc-bug-caused-by-google-api-core-upgrade-abbc097a46b5d032.yaml create mode 100644 tests/contrib/grpc_aio/hellostreamingworld_pb2.py create mode 100644 tests/contrib/grpc_aio/hellostreamingworld_pb2.pyi create mode 100644 tests/contrib/grpc_aio/hellostreamingworld_pb2_grpc.py diff --git a/ddtrace/contrib/grpc/aio_client_interceptor.py b/ddtrace/contrib/grpc/aio_client_interceptor.py index 8e63104aa3d..c6e49f290bb 100644 --- a/ddtrace/contrib/grpc/aio_client_interceptor.py +++ b/ddtrace/contrib/grpc/aio_client_interceptor.py @@ -27,11 +27,15 @@ from ...ext import SpanKind from ...ext import SpanTypes from ...internal.compat import to_unicode +from ...internal.logger import get_logger from ...propagation.http import HTTPPropagator from ..grpc import constants from ..grpc import utils +log = get_logger(__name__) + + def create_aio_client_interceptors(pin, host, port): # type: (Pin, str, int) -> Tuple[aio.ClientInterceptor, ...] return ( @@ -42,7 +46,17 @@ def create_aio_client_interceptors(pin, host, port): ) -def _done_callback(span, code, details): +def _handle_add_callback(call, callback): + try: + call.add_done_callback(callback) + except NotImplementedError: + # add_done_callback is not implemented in UnaryUnaryCallResponse + # https://github.com/grpc/grpc/blob/c54c69dcdd483eba78ed8dbc98c60a8c2d069758/src/python/grpcio/grpc/aio/_interceptor.py#L1058 + # If callback is not called, we need to finish the span here + callback(call) + + +def _done_callback_unary(span, code, details): # type: (Span, grpc.StatusCode, str) -> Callable[[aio.Call], None] def func(call): # type: (aio.Call) -> None @@ -51,15 +65,45 @@ def func(call): # Handle server-side error in unary response RPCs if code != grpc.StatusCode.OK: - _handle_error(span, call, code, details) + _handle_error(span, code, details) + finally: + span.finish() + + return func + + +def _done_callback_stream(span): + # type: (Span) -> Callable[[aio.Call], None] + def func(call): + # type: (aio.Call) -> None + try: + if call.done(): + # check to ensure code and details are not already set, in which case this span + # is an error span and already has all error tags from `_handle_cancelled_error` + code_tag = span.get_tag(constants.GRPC_STATUS_CODE_KEY) + details_tag = span.get_tag(ERROR_MSG) + if not code_tag or not details_tag: + # we need to call __repr__ as we cannot call code() or details() since they are both async + code, details = utils._parse_rpc_repr_string(call.__repr__(), grpc) + + span.set_tag_str(constants.GRPC_STATUS_CODE_KEY, to_unicode(code)) + + # Handle server-side error in unary response RPCs + if code != grpc.StatusCode.OK: + _handle_error(span, code, details) + else: + log.warning("Grpc call has not completed, unable to set status code and details on span.") + except ValueError: + # ValueError is thrown from _parse_rpc_repr_string + log.warning("Unable to parse async grpc string for status code and details.") finally: span.finish() return func -def _handle_error(span, call, code, details): - # type: (Span, aio.Call, grpc.StatusCode, str) -> None +def _handle_error(span, code, details): + # type: (Span, grpc.StatusCode, str) -> None span.error = 1 span.set_tag_str(ERROR_MSG, details) span.set_tag_str(ERROR_TYPE, to_unicode(code)) @@ -152,13 +196,13 @@ async def _wrap_stream_response( ): # type: (...) -> ResponseIterableType try: + _handle_add_callback(call, _done_callback_stream(span)) async for response in call: yield response - code = await call.code() - details = await call.details() - # NOTE: The callback is registered after the iteration is done, - # otherwise `call.code()` and `call.details()` block indefinitely. - call.add_done_callback(_done_callback(span, code, details)) + except StopAsyncIteration: + # Callback will handle span finishing + _handle_cancelled_error(call, span) + raise except aio.AioRpcError as rpc_error: # NOTE: We can also handle the error in done callbacks, # but reuse this error handling function used in unary response RPCs. @@ -184,7 +228,7 @@ async def _wrap_unary_response( # NOTE: As both `code` and `details` are available after the RPC is done (= we get `call` object), # and we can't call awaitable functions inside the non-async callback, # there is no other way but to register the callback here. - call.add_done_callback(_done_callback(span, code, details)) + _handle_add_callback(call, _done_callback_unary(span, code, details)) return call except aio.AioRpcError as rpc_error: # NOTE: `AioRpcError` is raised in `await continuation(...)` diff --git a/ddtrace/contrib/grpc/patch.py b/ddtrace/contrib/grpc/patch.py index 0d823a14e58..f46069b01f6 100644 --- a/ddtrace/contrib/grpc/patch.py +++ b/ddtrace/contrib/grpc/patch.py @@ -196,7 +196,7 @@ def _unpatch_aio_server(): def _client_channel_interceptor(wrapped, instance, args, kwargs): channel = wrapped(*args, **kwargs) - pin = Pin.get_from(channel) + pin = Pin.get_from(constants.GRPC_PIN_MODULE_CLIENT) if not pin or not pin.enabled(): return channel @@ -207,11 +207,10 @@ def _client_channel_interceptor(wrapped, instance, args, kwargs): def _aio_client_channel_interceptor(wrapped, instance, args, kwargs): - channel = wrapped(*args, **kwargs) + pin = Pin.get_from(GRPC_AIO_PIN_MODULE_CLIENT) - pin = Pin.get_from(channel) if not pin or not pin.enabled(): - return channel + return wrapped(*args, **kwargs) (host, port) = utils._parse_target_from_args(args, kwargs) diff --git a/ddtrace/contrib/grpc/utils.py b/ddtrace/contrib/grpc/utils.py index 306d338c948..5d08b7dd41b 100644 --- a/ddtrace/contrib/grpc/utils.py +++ b/ddtrace/contrib/grpc/utils.py @@ -1,4 +1,5 @@ import logging +import re from ddtrace.internal.compat import parse @@ -74,3 +75,30 @@ def _parse_target_from_args(args, kwargs): return hostname, port except ValueError: log.warning("Malformed target '%s'.", target) + + +def _parse_rpc_repr_string(rpc_string, module): + # Define the regular expression patterns to extract status and details + status_pattern = r"status\s*=\s*StatusCode\.(\w+)" + details_pattern = r'details\s*=\s*"([^"]*)"' + + # Search for the status and details in the input string + status_match = re.search(status_pattern, rpc_string) + details_match = re.search(details_pattern, rpc_string) + + if not status_match or not details_match: + raise ValueError("Unable to parse grpc status or details repr string") + + # Extract the status and details from the matches + status_str = status_match.group(1) + details = details_match.group(1) + + # Convert the status string to a grpc.StatusCode object + try: + code = module.StatusCode[status_str] + except KeyError: + code = None + raise ValueError("Invalid grpc status code: " + status_str) + + # Return the status code and details + return code, details diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index d2da2a8e67a..f5fbd18ffdb 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -15,6 +15,7 @@ MySQL OpenTracing Runtimes SpanContext +aio aiobotocore aiohttp aiomysql diff --git a/releasenotes/notes/fix-grpc-bug-caused-by-google-api-core-upgrade-abbc097a46b5d032.yaml b/releasenotes/notes/fix-grpc-bug-caused-by-google-api-core-upgrade-abbc097a46b5d032.yaml new file mode 100644 index 00000000000..485776c04cc --- /dev/null +++ b/releasenotes/notes/fix-grpc-bug-caused-by-google-api-core-upgrade-abbc097a46b5d032.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + fix(grpc): This change fixes a bug in the grpc.aio support specific to streaming responses. diff --git a/tests/contrib/grpc_aio/hellostreamingworld_pb2.py b/tests/contrib/grpc_aio/hellostreamingworld_pb2.py new file mode 100644 index 00000000000..6c6881cca3a --- /dev/null +++ b/tests/contrib/grpc_aio/hellostreamingworld_pb2.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: hellostreamingworld.proto +"""Generated protocol buffer code.""" +from ddtrace.internal.compat import PYTHON_VERSION_INFO + + +if PYTHON_VERSION_INFO > (3, 7): + from google.protobuf import descriptor as _descriptor + from google.protobuf import descriptor_pool as _descriptor_pool + from google.protobuf import symbol_database as _symbol_database + from google.protobuf.internal import builder as _builder + + # @@protoc_insertion_point(imports) + + _sym_db = _symbol_database.Default() + + DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x19hellostreamingworld.proto\x12\x13hellostreamingworld"3\n\x0cHelloRequest\x12\x0c\n\x04' + + b'name\x18\x01 \x01(\t\x12\x15\n\rnum_greetings\x18\x02 \x01(\t"\x1d\n\nHelloReply\x12\x0f\n\x07' + + b"message\x18\x01 \x01(\t2b\n\x0cMultiGreeter\x12R\n\x08sayHello\x12!.hellostreamingworld.HelloRequest" + + b'\x1a\x1f.hellostreamingworld.HelloReply"\x00\x30\x01\x42\x0f\n\x07\x65x.grpc\xa2\x02\x03HSWb\x06proto3' + ) + + _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) + _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "hellostreamingworld_pb2", globals()) + if _descriptor._USE_C_DESCRIPTORS is False: + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = b"\n\007ex.grpc\242\002\003HSW" + _HELLOREQUEST._serialized_start = 50 # noqa: F821 + _HELLOREQUEST._serialized_end = 101 # noqa: F821 + _HELLOREPLY._serialized_start = 103 # noqa: F821 + _HELLOREPLY._serialized_end = 132 # noqa: F821 + _MULTIGREETER._serialized_start = 134 # noqa: F821 + _MULTIGREETER._serialized_end = 232 # noqa: F821 + # @@protoc_insertion_point(module_scope) diff --git a/tests/contrib/grpc_aio/hellostreamingworld_pb2.pyi b/tests/contrib/grpc_aio/hellostreamingworld_pb2.pyi new file mode 100644 index 00000000000..8c14f38a1f6 --- /dev/null +++ b/tests/contrib/grpc_aio/hellostreamingworld_pb2.pyi @@ -0,0 +1,23 @@ +# isort: off +from typing import ClassVar as _ClassVar +from typing import Optional as _Optional + +from ddtrace.internal.compat import PYTHON_VERSION_INFO + +if PYTHON_VERSION_INFO > (3, 7): + from google.protobuf import descriptor as _descriptor + from google.protobuf import message as _message + + DESCRIPTOR: _descriptor.FileDescriptor + class HelloReply(_message.Message): + __slots__ = ["message"] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + message: str + def __init__(self, message: _Optional[str] = ...) -> None: ... + class HelloRequest(_message.Message): + __slots__ = ["name", "num_greetings"] + NAME_FIELD_NUMBER: _ClassVar[int] + NUM_GREETINGS_FIELD_NUMBER: _ClassVar[int] + name: str + num_greetings: str + def __init__(self, name: _Optional[str] = ..., num_greetings: _Optional[str] = ...) -> None: ... diff --git a/tests/contrib/grpc_aio/hellostreamingworld_pb2_grpc.py b/tests/contrib/grpc_aio/hellostreamingworld_pb2_grpc.py new file mode 100644 index 00000000000..338edb0beac --- /dev/null +++ b/tests/contrib/grpc_aio/hellostreamingworld_pb2_grpc.py @@ -0,0 +1,77 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from ddtrace.internal.compat import PYTHON_VERSION_INFO + + +if PYTHON_VERSION_INFO > (3, 7): + from tests.contrib.grpc_aio import hellostreamingworld_pb2 as hellostreamingworld__pb2 + + class MultiGreeterStub(object): + """The greeting service definition.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.sayHello = channel.unary_stream( + "/hellostreamingworld.MultiGreeter/sayHello", + request_serializer=hellostreamingworld__pb2.HelloRequest.SerializeToString, + response_deserializer=hellostreamingworld__pb2.HelloReply.FromString, + ) + + class MultiGreeterServicer(object): + """The greeting service definition.""" + + def sayHello(self, request, context): + """Sends multiple greetings""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def add_MultiGreeterServicer_to_server(servicer, server): + rpc_method_handlers = { + "sayHello": grpc.unary_stream_rpc_method_handler( + servicer.sayHello, + request_deserializer=hellostreamingworld__pb2.HelloRequest.FromString, + response_serializer=hellostreamingworld__pb2.HelloReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler("hellostreamingworld.MultiGreeter", rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + # This class is part of an EXPERIMENTAL API. + class MultiGreeter(object): + """The greeting service definition.""" + + @staticmethod + def sayHello( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_stream( + request, + target, + "/hellostreamingworld.MultiGreeter/sayHello", + hellostreamingworld__pb2.HelloRequest.SerializeToString, + hellostreamingworld__pb2.HelloReply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/tests/contrib/grpc_aio/test_grpc_aio.py b/tests/contrib/grpc_aio/test_grpc_aio.py index 09158def739..4e783bf2f97 100644 --- a/tests/contrib/grpc_aio/test_grpc_aio.py +++ b/tests/contrib/grpc_aio/test_grpc_aio.py @@ -16,18 +16,30 @@ from ddtrace.contrib.grpc import unpatch from ddtrace.contrib.grpc.patch import GRPC_AIO_PIN_MODULE_CLIENT from ddtrace.contrib.grpc.patch import GRPC_AIO_PIN_MODULE_SERVER +from ddtrace.contrib.grpc.utils import _parse_rpc_repr_string +from ddtrace.internal.compat import PYTHON_VERSION_INFO import ddtrace.vendor.packaging.version as packaging_version from tests.contrib.grpc.hello_pb2 import HelloReply from tests.contrib.grpc.hello_pb2 import HelloRequest from tests.contrib.grpc.hello_pb2_grpc import HelloServicer from tests.contrib.grpc.hello_pb2_grpc import HelloStub from tests.contrib.grpc.hello_pb2_grpc import add_HelloServicer_to_server + + +if PYTHON_VERSION_INFO > (3, 7): + from tests.contrib.grpc_aio.hellostreamingworld_pb2 import HelloReply as HelloReplyStream + from tests.contrib.grpc_aio.hellostreamingworld_pb2 import HelloRequest as HelloRequestStream + from tests.contrib.grpc_aio.hellostreamingworld_pb2_grpc import MultiGreeterServicer + from tests.contrib.grpc_aio.hellostreamingworld_pb2_grpc import MultiGreeterStub + from tests.contrib.grpc_aio.hellostreamingworld_pb2_grpc import add_MultiGreeterServicer_to_server + from tests.utils import DummyTracer from tests.utils import assert_is_measured from tests.utils import override_config _GRPC_PORT = 50531 +NUMBER_OF_REPLY = 10 class _CoroHelloServicer(HelloServicer): @@ -149,6 +161,39 @@ def SayHelloRepeatedly(self, request_iterator, context): yield HelloReply(message="Good bye") +if PYTHON_VERSION_INFO > (3, 7): + + class Greeter(MultiGreeterServicer): + async def sayHello(self, request, context): + for i in range(NUMBER_OF_REPLY): + yield HelloReplyStream(message=f"Hello number {i}, {request.name}!") + + class DummyClientInterceptor(aio.UnaryUnaryClientInterceptor): + async def intercept_unary_unary(self, continuation, client_call_details, request): + undone_call = await continuation(client_call_details, request) + return await undone_call + + def add_done_callback(self, unused_callback): + pass + + @pytest.fixture + async def async_server_info(request, tracer, event_loop): + _ServerInfo = namedtuple("_ServerInfo", ("target", "abort_supported")) + _server = grpc.aio.server() + add_MultiGreeterServicer_to_server(Greeter(), _server) + _servicer = request.param + target = f"localhost:{_GRPC_PORT}" + _server.add_insecure_port(target) + # interceptor can not catch AbortError for sync servicer + abort_supported = not isinstance(_servicer, (_SyncHelloServicer,)) + + await _server.start() + wait_task = event_loop.create_task(_server.wait_for_termination()) + yield _ServerInfo(target, abort_supported) + await _server.stop(grace=None) + await wait_task + + @pytest.fixture(autouse=True) def patch_grpc_aio(): patch() @@ -173,7 +218,6 @@ async def server_info(request, tracer, event_loop): tracer fixture is imported to make sure the tracer is pinned to the modules. """ _ServerInfo = namedtuple("_ServerInfo", ("target", "abort_supported")) - _servicer = request.param target = f"localhost:{_GRPC_PORT}" _server = _create_server(_servicer, target) @@ -198,16 +242,16 @@ def _get_spans(tracer): return tracer._writer.spans -def _check_client_span(span, service, method_name, method_kind): +def _check_client_span(span, service, method_name, method_kind, resource="helloworld.Hello"): assert_is_measured(span) assert span.name == "grpc" - assert span.resource == "/helloworld.Hello/{}".format(method_name) + assert span.resource == "/{}/{}".format(resource, method_name) assert span.service == service assert span.error == 0 assert span.span_type == "grpc" - assert span.get_tag("grpc.method.path") == "/helloworld.Hello/{}".format(method_name) - assert span.get_tag("grpc.method.package") == "helloworld" - assert span.get_tag("grpc.method.service") == "Hello" + assert span.get_tag("grpc.method.path") == "/{}/{}".format(resource, method_name) + assert span.get_tag("grpc.method.package") == resource.split(".")[0] + assert span.get_tag("grpc.method.service") == resource.split(".")[1] assert span.get_tag("grpc.method.name") == method_name assert span.get_tag("grpc.method.kind") == method_kind assert span.get_tag("grpc.status.code") == "StatusCode.OK" @@ -217,16 +261,16 @@ def _check_client_span(span, service, method_name, method_kind): assert span.get_tag("span.kind") == "client" -def _check_server_span(span, service, method_name, method_kind): +def _check_server_span(span, service, method_name, method_kind, resource="helloworld.Hello"): assert_is_measured(span) assert span.name == "grpc" - assert span.resource == "/helloworld.Hello/{}".format(method_name) + assert span.resource == "/{}/{}".format(resource, method_name) assert span.service == service assert span.error == 0 assert span.span_type == "grpc" - assert span.get_tag("grpc.method.path") == "/helloworld.Hello/{}".format(method_name) - assert span.get_tag("grpc.method.package") == "helloworld" - assert span.get_tag("grpc.method.service") == "Hello" + assert span.get_tag("grpc.method.path") == "/{}/{}".format(resource, method_name) + assert span.get_tag("grpc.method.package") == resource.split(".")[0] + assert span.get_tag("grpc.method.service") == resource.split(".")[1] assert span.get_tag("grpc.method.name") == method_name assert span.get_tag("grpc.method.kind") == method_kind assert span.get_tag("component") == "grpc_aio_server" @@ -975,3 +1019,147 @@ async def test_client_streaming(server_info, tracer): out, err, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env) assert status == 0, (err.decode(), out.decode()) assert err == b"", err.decode() + + +class StreamInterceptor(grpc.aio.UnaryStreamClientInterceptor): + async def intercept_unary_stream(self, continuation, call_details, request): + response_iterator = await continuation(call_details, request) + return response_iterator + + +if PYTHON_VERSION_INFO > (3, 7): + + async def run_streaming_example(server_info, use_generator=False): + i = 0 + async with grpc.aio.insecure_channel(server_info.target, interceptors=[StreamInterceptor()]) as channel: + stub = MultiGreeterStub(channel) + + # Read from an async generator + if use_generator: + async for response in stub.sayHello(HelloRequestStream(name="you")): + assert response.message == "Hello number {}, you!".format(i) + i += 1 + + # Direct read from the stub + else: + hello_stream = stub.sayHello(HelloRequestStream(name="will")) + while True: + response = await hello_stream.read() + if response == grpc.aio.EOF: + break + assert response.message == "Hello number {}, will!".format(i) + i += 1 + + @pytest.mark.asyncio + @pytest.mark.skip( + "Bug/error from grpc when adding an async streaming client interceptor throws StopAsyncIteration. Issue can be \ + found at: https://github.com/DataDog/dd-trace-py/issues/9139" + ) + @pytest.mark.skipif( + PYTHON_VERSION_INFO < (3, 6), + reason="Protobuf package versino needed for stubs is not supported by python 3.6", + ) + @pytest.mark.parametrize("async_server_info", [_CoroHelloServicer()], indirect=True) + async def test_async_streaming_direct_read(async_server_info, tracer): + await run_streaming_example(async_server_info) + + spans = _get_spans(tracer) + assert len(spans) == 2 + client_span, server_span = spans + + # No error because cancelled after execution + _check_client_span(client_span, "grpc-aio-client", "SayHelloRepeatedly", "bidi_streaming") + _check_server_span(server_span, "grpc-aio-server", "SayHelloRepeatedly", "bidi_streaming") + + @pytest.mark.asyncio + @pytest.mark.skipif( + PYTHON_VERSION_INFO < (3, 6), + reason="Protobuf package versino needed for stubs is not supported by python 3.6", + ) + @pytest.mark.parametrize("async_server_info", [_CoroHelloServicer()], indirect=True) + async def test_async_streaming_generator(async_server_info, tracer): + await run_streaming_example(async_server_info, use_generator=True) + + spans = _get_spans(tracer) + assert len(spans) == 2 + client_span, server_span = spans + + # No error because cancelled after execution + _check_client_span( + client_span, "grpc-aio-client", "sayHello", "server_streaming", "hellostreamingworld.MultiGreeter" + ) + _check_server_span( + server_span, "grpc-aio-server", "sayHello", "server_streaming", "hellostreamingworld.MultiGreeter" + ) + + +repr_test_cases = [ + { + "rpc_string": 'status = StatusCode.OK, details = "Everything is fine"', + "expected_code": grpc.StatusCode.OK, + "expected_details": "Everything is fine", + "expect_error": False, + }, + { + "rpc_string": 'status = StatusCode.ABORTED, details = "Everything is not fine"', + "expected_code": grpc.StatusCode.ABORTED, + "expected_details": "Everything is not fine", + "expect_error": False, + }, + { + "rpc_string": "status = , details = ", + "expected_code": None, + "expected_details": None, + "expect_error": True, + }, + { + "rpc_string": 'details = "Everything is fine"', + "expected_code": None, + "expected_details": None, + "expect_error": True, + }, + { + "rpc_string": "status = StatusCode.ABORTED", + "expected_code": None, + "expected_details": None, + "expect_error": True, + }, + { + "rpc_string": 'status = StatusCode.INVALID_STATUS_CODE_SAD, details = "Everything is fine"', + "expected_code": None, + "expected_details": None, + "expect_error": True, + }, + { + "rpc_string": ' status = StatusCode.CANCELLED , details = "Everything is not fine" ', + "expected_code": grpc.StatusCode.CANCELLED, + "expected_details": "Everything is not fine", + "expect_error": False, + }, + { + "rpc_string": "status=StatusCode.OK details='Everything is fine'", + "expected_code": None, + "expected_details": None, + "expect_error": True, + }, +] + + +@pytest.mark.parametrize("case", repr_test_cases) +def test_parse_rpc_repr_string(case): + try: + code, details = _parse_rpc_repr_string(case["rpc_string"], grpc) + assert not case[ + "expect_error" + ], f"Test case with repr string: {case['rpc_string']} expected error but got result" + assert ( + code == case["expected_code"] + ), f"Test case with repr string: {case['rpc_string']} expected code {case['expected_code']} but got {code}" + assert details == case["expected_details"], ( + f"Test case with repr string: {case['rpc_string']} expected details {case['expected_details']} but" + f"got {details}" + ) + except ValueError as e: + assert case[ + "expect_error" + ], f"Test case with repr string: {case['rpc_string']} did not expect error but got {e}"