Skip to content

Commit

Permalink
Introducing distributed mutations (#7)
Browse files Browse the repository at this point in the history
* Introducing distributed mutations

* Fix CI

* Fix handlers tests

* Fix JS client disconnect callback

* Add callbacks to JS Store instance

* Fix `terminate` callback

* Fix registry cleanup when store pid is `:DOWN`

* Code cleanup
  • Loading branch information
drozdzynski authored May 6, 2024
1 parent 5817666 commit 3e5ef05
Show file tree
Hide file tree
Showing 30 changed files with 782 additions and 370 deletions.
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
${{ runner.os }}-${{matrix.otp}}-${{matrix.elixir}}-mix-
- name: Install Dependencies
run: |
epmd -daemon
mix local.rebar --force
mix local.hex --force
mix deps.get
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# storex

## 0.4.0

- **[BREAKING]** `Storex.mutate/3` is no longer based on `session_id`
- **[BREAKING]** `Store.init/2` callback now need to return `{:ok, state} | {:ok, state, key} | {:error, reason}`
- **[BREAKING]** Remove custom `Registry` logic
- **[BREAKING]** Remove `connection` callback from javascript client
- New registry mechanism provides distributed mutations across the cluster
- Fix `terminate` callback in `Storex.Handler.Plug`
- Added three callbacks to frontend client `onConnected`, `onError` and `onDisconnected`

## 0.3.0

- **[BREAKING]** Rename Cowbow handler module from `Storex.Socket.Handler` to `Storex.Handler.Cowboy`
Expand Down
36 changes: 20 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,20 @@ end

### Create store

To create a store you need to create new elixir module with `init/2` which is called when a page is loaded, every time websocket is connected it generates session_id and passes it as the first argument, params are from Javascript store declaration. Next, you can declare `mutation/5` where the first argument is mutation name, second is data passed to mutation, next two params are same like in `init/2`, the last one is the current state of the store.
To create a store you need to create new elixir module with `init/2` which is called when a page is loaded, every time websocket is connected it generates session_id and passes it as the first argument, params are from Javascript store declaration. `init/2` callback need to return one of this tuples:

- `{:ok, state}` - for initial state
- `{:ok, state, key}` - for initial state with `key` which can be used as selector for future mutations
- `{:error, reason}` - to send error message to frontend on initialization

Next, you can declare `mutation/5` where the first argument is mutation name, second is data passed to mutation, next two params are same like in `init/2`, the last one is the current state of the store.

```elixir
defmodule ExampleApp.Store.Counter do
use Storex.Store

def init(session_id, params) do
0
{:ok, 0}
end

# `increase` is mutation name, `data` is payload from front-end, `session_id` is current session id of connecton, `initial_params` with which store was initialized, `state` is store current state.
Expand Down Expand Up @@ -98,8 +104,14 @@ const store = new Storex({
subscribe: (state) => {
console.log(state)
},
connection: (state) => {
console.log(state ? 'connected' : 'disconnected')
onConnected() {
console.log('connected')
},
onError(error) {
console.log('error', error)
},
onDisconnected(closeEvent) {
console.log('disconnected', closeEvent)
}
})
```
Expand All @@ -119,8 +131,10 @@ store.commit("set", 10)
Or directly from elixir:

```elixir
Storex.mutate(session_id, store, "increase")
Storex.mutate(session_id, store, "set", [10])
Storex.mutate(store, "increase", [])
Storex.mutate(store, "set", [10])
Storex.mutate(key, store, "increase", [])
Storex.mutate(key, store, "set", [10])
```

### Subscribe to store state changes
Expand All @@ -133,16 +147,6 @@ store.subscribe((state) => {
})
```

### Subscribe to store connection

You can subscribe to store connection state changes in javascript with function connection:

```javascript
store.connection((state) => {
console.log(state ? 'connected' : 'disconnected')
})
```

## Configuration

### Session id generation library
Expand Down
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ import Config
config :storex,
session_id_library: Nanoid,
registry: Storex.Registry.ETS

if Mix.env() == :test do
import_config "test.exs"
end
46 changes: 33 additions & 13 deletions lib/storex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,48 @@ defmodule Storex do
def start(_type, _args) do
import Supervisor.Spec, warn: false

children = [
{Storex.Registry.ETS, []},
{Storex.Supervisor, []}
]
children =
pg_children() ++
[
{Storex.PG, []},
{Storex.Registry, []},
{Storex.Supervisor, []}
]

Supervisor.start_link(children, strategy: :one_for_one, name: __MODULE__)
end

if Code.ensure_loaded?(:pg) do
defp pg_children() do
[%{id: :pg, start: {:pg, :start_link, [Storex.PG]}}]
end
else
defp pg_children() do
[]
end
end

@doc """
Mutate store from elixir.
Call mutation callback in store synchronously:
Invoke mutation callback globally across specified store asynchronously:
```elixir
Storex.mutate("d9ez7fgkp96", "ExampleApp.Store", "reload", ["user_id"])
Storex.mutate("ExampleApp.Store", "reload", ["params"])
```
"""
def mutate(session, store, mutation, payload \\ [])
when is_binary(session) and is_binary(store) do
Storex.Registry.session_pid(session)
|> case do
:undefined -> {:error, "Session #{session} not found."}
pid -> Kernel.send(pid, {:mutate, store, mutation, payload})
end
def mutate(store, mutation, payload) do
Storex.PG.broadcast({:mutate, store, mutation, payload})
end

@doc """
Mutate store from elixir.
Invoke mutation callback by specified key and store asynchronously:
```elixir
Storex.mutate("user_id", "ExampleApp.Store", "reload", ["params"])
```
"""
def mutate(key, store, mutation, payload) do
Storex.PG.broadcast({:mutate, key, store, mutation, payload})
end
end
6 changes: 1 addition & 5 deletions lib/storex/handler/cowboy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ defmodule Storex.Handler.Cowboy do
def init(request, _state) do
session = Application.get_env(:storex, :session_id_library, Nanoid).generate()

Storex.Registry.register_session(session, request.pid)

{:cowboy_websocket, request, %{session: session, pid: request.pid}}
end

Expand All @@ -17,12 +15,10 @@ defmodule Storex.Handler.Cowboy do

def terminate(_reason, _req, %{session: session}) do
Storex.Registry.session_stores(session)
|> Enum.each(fn {session, store, _} ->
|> Enum.each(fn {store, _, session, _, _} ->
Storex.Supervisor.remove_store(session, store)
end)

Storex.Registry.unregister_session(session)

:ok
end

Expand Down
6 changes: 1 addition & 5 deletions lib/storex/handler/plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,15 @@ defmodule Storex.Handler.Plug do
session = Application.get_env(:storex, :session_id_library, Nanoid).generate()
pid = self()

Storex.Registry.register_session(session, pid)

{:ok, %{session: session, pid: pid}}
end

def terminate(_reason, %{session: session}) do
Storex.Registry.session_stores(session)
|> Enum.each(fn {session, store, _} ->
|> Enum.each(fn {store, _, session, _, _} ->
Storex.Supervisor.remove_store(session, store)
end)

Storex.Registry.unregister_session(session)

:ok
end

Expand Down
77 changes: 77 additions & 0 deletions lib/storex/pg.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
defmodule Storex.PG do
@moduledoc false
use GenServer

@name :storex_pg

def start_link(_) do
GenServer.start_link(__MODULE__, [], name: @name)
end

@impl true
def init(_) do
:ok = pg_join(@name)
{:ok, @name}
end

def broadcast(payload) do
pg_members(@name)
|> case do
{:error, _} ->
:error

pids ->
for pid <- pids do
send(pid, {:broadcast, payload})
end
end
end

@impl true
def handle_info({:broadcast, {:mutate, store, mutation, payload}}, state) do
Storex.Registry.get_store_instances({store, :_, :_, :_, :_})
|> Enum.map(fn {^store, _, _, session_pid, _} ->
Kernel.send(session_pid, {:mutate, store, mutation, payload})
end)

{:noreply, state}
end

@impl true
def handle_info({:broadcast, {:mutate, key, store, mutation, payload}}, state) do
Storex.Registry.get_store_instances({store, :_, :_, :_, key})
|> Enum.each(fn {^store, _, _, session_pid, ^key} ->
Kernel.send(session_pid, {:mutate, store, mutation, payload})
end)

{:noreply, state}
end

@impl true
def handle_info(_, state) do
{:noreply, state}
end

if Code.ensure_loaded?(:pg) do
defp pg_members(group) do
:pg.get_members(Storex.PG, group)
end
else
defp pg_members(group) do
:pg2.get_members({:storex, group})
end
end

if Code.ensure_loaded?(:pg) do
defp pg_join(group) do
:ok = :pg.join(Storex.PG, group, self())
end
else
defp pg_join(group) do
namespace = {:storex, group}
:ok = :pg2.create(namespace)
:ok = :pg2.join(namespace, self())
:ok
end
end
end
Loading

0 comments on commit 3e5ef05

Please sign in to comment.