Skip to content

Commit

Permalink
refactor: minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
hpopp committed May 4, 2018
1 parent 3de4248 commit 375c25d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 37 deletions.
4 changes: 3 additions & 1 deletion lib/connection/flow_control.ex
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
defmodule Kadabra.Connection.FlowControl do
@moduledoc false

@default_window_size 65_535

defstruct queue: :queue.new(),
stream_id: 1,
active_stream_count: 0,
active_streams: MapSet.new(),
window: 65_535,
window: @default_window_size,
settings: %Kadabra.Connection.Settings{}

alias Kadabra.{Config, Connection, StreamSupervisor}
Expand Down
10 changes: 6 additions & 4 deletions lib/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ defmodule Kadabra.Stream do
def new(%Config{} = config, %Settings{} = settings, stream_id) do
flow_opts = [
stream_id: stream_id,
socket: config.socket,
window: settings.initial_window_size,
max_frame_size: settings.max_frame_size
]
Expand Down Expand Up @@ -123,7 +124,7 @@ defmodule Kadabra.Stream do
flow =
stream.flow
|> Stream.FlowControl.increment_window(inc)
|> Stream.FlowControl.process(stream.config.socket)
|> Stream.FlowControl.process()

{:keep_state, %{stream | flow: flow}}
end
Expand Down Expand Up @@ -180,8 +181,9 @@ defmodule Kadabra.Stream do
{:next_state, @reserved_remote, stream}
end

def recv(from, %Continuation{} = frame, _state, stream) do
{:ok, headers} = Hpack.decode(stream.decoder, frame.header_block_fragment)
def recv(from, %Continuation{} = frame, _state, %{config: config} = stream) do
{:ok, headers} = Hpack.decode(config.ref, frame.header_block_fragment)

:gen_statem.reply(from, :ok)

stream = %Stream{stream | headers: stream.headers ++ headers}
Expand Down Expand Up @@ -257,7 +259,7 @@ defmodule Kadabra.Stream do
if payload do
stream.flow
|> Stream.FlowControl.add(payload)
|> Stream.FlowControl.process(stream.config.socket)
|> Stream.FlowControl.process()
else
stream.flow
end
Expand Down
72 changes: 40 additions & 32 deletions lib/stream/flow_control.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
defmodule Kadabra.Stream.FlowControl do
@moduledoc false

@default_window_size 65_535

defstruct queue: :queue.new(),
window: 56_536,
window: @default_window_size,
max_frame_size: 16_384,
socket: nil,
stream_id: nil

alias Kadabra.{Encodable, Frame}
Expand All @@ -12,6 +15,7 @@ defmodule Kadabra.Stream.FlowControl do
@type t :: %__MODULE__{
max_frame_size: non_neg_integer,
queue: :queue.queue(binary),
socket: pid,
window: integer
}

Expand All @@ -34,8 +38,9 @@ defmodule Kadabra.Stream.FlowControl do
@spec new(Keyword.t()) :: t
def new(opts \\ []) do
%__MODULE__{
stream_id: opts[:stream_id],
window: Keyword.get(opts, :window, 56_536),
stream_id: Keyword.get(opts, :stream_id),
socket: Keyword.get(opts, :socket),
window: Keyword.get(opts, :window, @default_window_size),
max_frame_size: Keyword.get(opts, :max_frame_size, 16_384)
}
end
Expand All @@ -60,63 +65,66 @@ defmodule Kadabra.Stream.FlowControl do
## Examples
iex> process(%Kadabra.Stream.FlowControl{queue: :queue.new()}, self())
iex> process(%Kadabra.Stream.FlowControl{queue: :queue.new()})
%Kadabra.Stream.FlowControl{queue: {[], []}}
iex> queue = :queue.in({:send, "test"}, :queue.new())
iex> process(%Kadabra.Stream.FlowControl{queue: queue,
...> window: -20}, self())
iex> process(%Kadabra.Stream.FlowControl{queue: queue, window: -20})
%Kadabra.Stream.FlowControl{queue: {[send: "test"], []}, window: -20}
"""
@spec process(t, sock) :: t
def process(%{window: window} = flow_control, _sock) when window <= 0 do
@spec process(t) :: t
def process(%{window: window} = flow_control) when window <= 0 do
flow_control
end

def process(%{queue: queue} = flow_control, socket) do
def process(%{queue: queue} = flow_control) do
case :queue.out(queue) do
{{:value, bin}, queue} ->
flow_control
|> Map.put(:queue, queue)
|> do_process(socket, bin)
|> do_process(bin)

{:empty, _queue} ->
flow_control
end
end

def do_process(flow_control, socket, bin) do
defp do_process(%{window: window} = flow, bin) when byte_size(bin) > window do
%{
queue: queue,
max_frame_size: max_size,
window: window,
socket: socket,
stream_id: stream_id
} = flow_control
} = flow

size = byte_size(bin)
{chunk, rem_bin} = :erlang.split_binary(bin, window)

if size > window do
{chunk, rem_bin} = :erlang.split_binary(bin, window)
max_size
|> split_packet(chunk)
|> send_partial_data(socket, stream_id)

max_size
|> split_packet(chunk)
|> send_partial_data(socket, stream_id)
queue = :queue.in_r(rem_bin, queue)

queue = :queue.in_r(rem_bin, queue)
flow
|> Map.put(:queue, queue)
|> Map.put(:window, 0)
|> process()
end

defp do_process(%{window: window} = flow_control, bin) do
%{
max_frame_size: max_size,
socket: socket,
stream_id: stream_id
} = flow_control

flow_control
|> Map.put(:queue, queue)
|> Map.put(:window, 0)
|> process(socket)
else
max_size
|> split_packet(bin)
|> send_data(socket, stream_id)
max_size
|> split_packet(bin)
|> send_data(socket, stream_id)

flow_control
|> Map.put(:window, window - size)
|> process(socket)
end
flow_control
|> Map.put(:window, window - byte_size(bin))
|> process()
end

def send_partial_data([], _socket, _stream_id), do: :ok
Expand Down

0 comments on commit 375c25d

Please sign in to comment.