forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
codec_server.py
52 lines (42 loc) · 1.72 KB
/
codec_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from functools import partial
from typing import Awaitable, Callable, Iterable, List
from aiohttp import hdrs, web
from google.protobuf import json_format
from temporalio.api.common.v1 import Payload, Payloads
from encryption.codec import EncryptionCodec
def build_codec_server() -> web.Application:
# Cors handler
async def cors_options(req: web.Request) -> web.Response:
resp = web.Response()
if req.headers.get(hdrs.ORIGIN) == "http://localhost:8233":
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_ORIGIN] = "http://localhost:8233"
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_METHODS] = "POST"
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_HEADERS] = "content-type,x-namespace"
return resp
# General purpose payloads-to-payloads
async def apply(
fn: Callable[[Iterable[Payload]], Awaitable[List[Payload]]], req: web.Request
) -> web.Response:
# Read payloads as JSON
assert req.content_type == "application/json"
payloads = json_format.Parse(await req.read(), Payloads())
# Apply
payloads = Payloads(payloads=await fn(payloads.payloads))
# Apply CORS and return JSON
resp = await cors_options(req)
resp.content_type = "application/json"
resp.text = json_format.MessageToJson(payloads)
return resp
# Build app
codec = EncryptionCodec()
app = web.Application()
app.add_routes(
[
web.post("/encode", partial(apply, codec.encode)),
web.post("/decode", partial(apply, codec.decode)),
web.options("/decode", cors_options),
]
)
return app
if __name__ == "__main__":
web.run_app(build_codec_server(), host="127.0.0.1", port=8081)