Skip to content

Commit

Permalink
Merge branch 'main' into v2/metadata-structured-swift
Browse files Browse the repository at this point in the history
  • Loading branch information
glbrntt authored Nov 18, 2024
2 parents b45e2bd + dd22b39 commit f349abd
Show file tree
Hide file tree
Showing 10 changed files with 583 additions and 84 deletions.
9 changes: 5 additions & 4 deletions Sources/GRPCCore/Call/Client/ClientInterceptor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
/// received from the transport. They are typically used for cross-cutting concerns like injecting
/// metadata, validating messages, logging additional data, and tracing.
///
/// Interceptors are registered with a client and apply to all RPCs. If you need to modify the
/// behavior of an interceptor on a per-RPC basis then you can use the
/// ``ClientContext/descriptor`` to determine which RPC is being called and
/// conditionalise behavior accordingly.
/// Interceptors are registered with the server via ``ClientInterceptorPipelineOperation``s.
/// You may register them for all services registered with a server, for RPCs directed to specific services, or
/// for RPCs directed to specific methods. If you need to modify the behavior of an interceptor on a
/// per-RPC basis in more detail, then you can use the ``ClientContext/descriptor`` to determine
/// which RPC is being called and conditionalise behavior accordingly.
///
/// - TODO: Update example and documentation to show how to register an interceptor.
///
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2024, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/// A `ClientInterceptorPipelineOperation` describes to which RPCs a client interceptor should be applied.
///
/// You can configure a client interceptor to be applied to:
/// - all RPCs and services;
/// - requests directed only to specific services; or
/// - requests directed only to specific methods (of a specific service).
///
/// - SeeAlso: ``ClientInterceptor`` for more information on client interceptors, and
/// ``ServerInterceptorPipelineOperation`` for the server-side version of this type.
public struct ClientInterceptorPipelineOperation: Sendable {
/// The subject of a ``ClientInterceptorPipelineOperation``.
/// The subject of an interceptor can either be all services and methods, only specific services, or only specific methods.
public struct Subject: Sendable {
internal enum Wrapped: Sendable {
case all
case services(Set<ServiceDescriptor>)
case methods(Set<MethodDescriptor>)
}

private let wrapped: Wrapped

/// An operation subject specifying an interceptor that applies to all RPCs across all services will be registered with this client.
public static var all: Self { .init(wrapped: .all) }

/// An operation subject specifying an interceptor that will be applied only to RPCs directed to the specified services.
/// - Parameters:
/// - services: The list of service names for which this interceptor should intercept RPCs.
/// - Returns: A ``ClientInterceptorPipelineOperation``.
public static func services(_ services: Set<ServiceDescriptor>) -> Self {
Self(wrapped: .services(services))
}

/// An operation subject specifying an interceptor that will be applied only to RPCs directed to the specified service methods.
/// - Parameters:
/// - methods: The list of method descriptors for which this interceptor should intercept RPCs.
/// - Returns: A ``ClientInterceptorPipelineOperation``.
public static func methods(_ methods: Set<MethodDescriptor>) -> Self {
Self(wrapped: .methods(methods))
}

@usableFromInline
internal func applies(to descriptor: MethodDescriptor) -> Bool {
switch self.wrapped {
case .all:
return true

case .services(let services):
return services.map({ $0.fullyQualifiedService }).contains(descriptor.service)

case .methods(let methods):
return methods.contains(descriptor)
}
}
}

/// The interceptor specified for this operation.
public let interceptor: any ClientInterceptor

@usableFromInline
internal let subject: Subject

private init(interceptor: any ClientInterceptor, appliesTo: Subject) {
self.interceptor = interceptor
self.subject = appliesTo
}

/// Create an operation, specifying which ``ClientInterceptor`` to apply and to which ``Subject``.
/// - Parameters:
/// - interceptor: The ``ClientInterceptor`` to register with the client.
/// - subject: The ``Subject`` to which the `interceptor` applies.
/// - Returns: A ``ClientInterceptorPipelineOperation``.
public static func apply(_ interceptor: any ClientInterceptor, to subject: Subject) -> Self {
Self(interceptor: interceptor, appliesTo: subject)
}

/// Returns whether this ``ClientInterceptorPipelineOperation`` applies to the given `descriptor`.
/// - Parameter descriptor: A ``MethodDescriptor`` for which to test whether this interceptor applies.
/// - Returns: `true` if this interceptor applies to the given `descriptor`, or `false` otherwise.
@inlinable
internal func applies(to descriptor: MethodDescriptor) -> Bool {
self.subject.applies(to: descriptor)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ struct ServerRPCExecutor {
/// - stream: The accepted stream to execute the RPC on.
/// - deserializer: A deserializer for messages received from the client.
/// - serializer: A serializer for messages to send to the client.
/// - interceptors: Server interceptors to apply to this RPC.
/// - interceptors: Server interceptors to apply to this RPC. The
/// interceptors will be called in the order of the array.
/// - handler: A handler which turns the request into a response.
@inlinable
static func execute<Input, Output>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
/// - requests directed only to specific services registered with your server; or
/// - requests directed only to specific methods (of a specific service).
///
/// - SeeAlso: ``ServerInterceptor`` for more information on server interceptors.
/// - SeeAlso: ``ServerInterceptor`` for more information on server interceptors, and
/// ``ClientInterceptorPipelineOperation`` for the client-side version of this type.
public struct ServerInterceptorPipelineOperation: Sendable {
/// The subject of a ``ServerInterceptorPipelineOperation``.
/// The subject of an interceptor can either be all services and methods, only specific services, or only specific methods.
Expand Down
52 changes: 36 additions & 16 deletions Sources/GRPCCore/Call/Server/ServerResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,25 @@ extension ServerResponse {
self.accepted = .failure(error)
}

/// Returns the metadata to be sent to the client at the start of the response.
///
/// For rejected RPCs (in other words, where ``accepted`` is `failure`) the metadata is empty.
/// The metadata to be sent to the client at the start of the response.
public var metadata: Metadata {
switch self.accepted {
case let .success(contents):
return contents.metadata
case .failure:
return [:]
get {
switch self.accepted {
case let .success(contents):
return contents.metadata
case .failure(let error):
return error.metadata
}
}
set {
switch self.accepted {
case var .success(contents):
contents.metadata = newValue
self.accepted = .success(contents)
case var .failure(error):
error.metadata = newValue
self.accepted = .failure(error)
}
}
}

Expand Down Expand Up @@ -303,15 +313,25 @@ extension StreamingServerResponse {
self.accepted = .failure(error)
}

/// Returns metadata received from the server at the start of the response.
///
/// For rejected RPCs (in other words, where ``accepted`` is `failure`) the metadata is empty.
/// The metadata to be sent to the client at the start of the response.
public var metadata: Metadata {
switch self.accepted {
case let .success(contents):
return contents.metadata
case .failure:
return [:]
get {
switch self.accepted {
case let .success(contents):
return contents.metadata
case .failure(let error):
return error.metadata
}
}
set {
switch self.accepted {
case var .success(contents):
contents.metadata = newValue
self.accepted = .success(contents)
case var .failure(error):
error.metadata = newValue
self.accepted = .failure(error)
}
}
}
}
Expand Down
88 changes: 70 additions & 18 deletions Sources/GRPCCore/GRPCClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,12 @@ public final class GRPCClient: Sendable {
/// The transport which provides a bidirectional communication channel with the server.
private let transport: any ClientTransport

/// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
///
/// The order in which interceptors are added reflects the order in which they are called. The
/// first interceptor added will be the first interceptor to intercept each request. The last
/// interceptor added will be the final interceptor to intercept each request before calling
/// the appropriate handler.
private let interceptors: [any ClientInterceptor]

/// The current state of the client.
private let state: Mutex<State>
private let stateMachine: Mutex<StateMachine>

/// The state of the client.
private enum State: Sendable {

/// The client hasn't been started yet. Can transition to `running` or `stopped`.
case notStarted
/// The client is running and can send RPCs. Can transition to `stopping`.
Expand Down Expand Up @@ -187,22 +180,79 @@ public final class GRPCClient: Sendable {
}
}

private struct StateMachine {
var state: State

private let interceptorPipeline: [ClientInterceptorPipelineOperation]

/// A collection of interceptors providing cross-cutting functionality to each accepted RPC, keyed by the method to which they apply.
///
/// The list of interceptors for each method is computed from `interceptorsPipeline` when calling a method for the first time.
/// This caching is done to avoid having to compute the applicable interceptors for each request made.
///
/// The order in which interceptors are added reflects the order in which they are called. The
/// first interceptor added will be the first interceptor to intercept each request. The last
/// interceptor added will be the final interceptor to intercept each request before calling
/// the appropriate handler.
var interceptorsPerMethod: [MethodDescriptor: [any ClientInterceptor]]

init(interceptorPipeline: [ClientInterceptorPipelineOperation]) {
self.state = .notStarted
self.interceptorPipeline = interceptorPipeline
self.interceptorsPerMethod = [:]
}

mutating func checkExecutableAndGetApplicableInterceptors(
for method: MethodDescriptor
) throws -> [any ClientInterceptor] {
try self.state.checkExecutable()

guard let applicableInterceptors = self.interceptorsPerMethod[method] else {
let applicableInterceptors = self.interceptorPipeline
.filter { $0.applies(to: method) }
.map { $0.interceptor }
self.interceptorsPerMethod[method] = applicableInterceptors
return applicableInterceptors
}

return applicableInterceptors
}
}

/// Creates a new client with the given transport, interceptors and configuration.
///
/// - Parameters:
/// - transport: The transport used to establish a communication channel with a server.
/// - interceptors: A collection of interceptors providing cross-cutting functionality to each
/// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
/// accepted RPC. The order in which interceptors are added reflects the order in which they
/// are called. The first interceptor added will be the first interceptor to intercept each
/// request. The last interceptor added will be the final interceptor to intercept each
/// request before calling the appropriate handler.
public init(
convenience public init(
transport: some ClientTransport,
interceptors: [any ClientInterceptor] = []
) {
self.init(
transport: transport,
interceptorPipeline: interceptors.map { .apply($0, to: .all) }
)
}

/// Creates a new client with the given transport, interceptors and configuration.
///
/// - Parameters:
/// - transport: The transport used to establish a communication channel with a server.
/// - interceptorPipeline: A collection of ``ClientInterceptorPipelineOperation`` providing cross-cutting
/// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
/// The order in which interceptors are added reflects the order in which they are called.
/// The first interceptor added will be the first interceptor to intercept each request.
/// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
public init(
transport: some ClientTransport,
interceptorPipeline: [ClientInterceptorPipelineOperation]
) {
self.transport = transport
self.interceptors = interceptors
self.state = Mutex(.notStarted)
self.stateMachine = Mutex(StateMachine(interceptorPipeline: interceptorPipeline))
}

/// Start the client.
Expand All @@ -213,11 +263,11 @@ public final class GRPCClient: Sendable {
/// The client, and by extension this function, can only be run once. If the client is already
/// running or has already been closed then a ``RuntimeError`` is thrown.
public func run() async throws {
try self.state.withLock { try $0.run() }
try self.stateMachine.withLock { try $0.state.run() }

// When this function exits the client must have stopped.
defer {
self.state.withLock { $0.stopped() }
self.stateMachine.withLock { $0.state.stopped() }
}

do {
Expand All @@ -237,7 +287,7 @@ public final class GRPCClient: Sendable {
/// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
/// executing ``run()`` if you want to abruptly stop in-flight RPCs.
public func beginGracefulShutdown() {
let wasRunning = self.state.withLock { $0.beginGracefulShutdown() }
let wasRunning = self.stateMachine.withLock { $0.state.beginGracefulShutdown() }
if wasRunning {
self.transport.beginGracefulShutdown()
}
Expand Down Expand Up @@ -356,7 +406,9 @@ public final class GRPCClient: Sendable {
options: CallOptions,
handler: @Sendable @escaping (StreamingClientResponse<Response>) async throws -> ReturnValue
) async throws -> ReturnValue {
try self.state.withLock { try $0.checkExecutable() }
let applicableInterceptors = try self.stateMachine.withLock {
try $0.checkExecutableAndGetApplicableInterceptors(for: descriptor)
}
let methodConfig = self.transport.config(forMethod: descriptor)
var options = options
options.formUnion(with: methodConfig)
Expand All @@ -368,7 +420,7 @@ public final class GRPCClient: Sendable {
serializer: serializer,
deserializer: deserializer,
transport: self.transport,
interceptors: self.interceptors,
interceptors: applicableInterceptors,
handler: handler
)
}
Expand Down
Loading

0 comments on commit f349abd

Please sign in to comment.