Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing distributed mutations #7

Merged
merged 8 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
${{ runner.os }}-${{matrix.otp}}-${{matrix.elixir}}-mix-
- name: Install Dependencies
run: |
epmd -daemon
mix local.rebar --force
mix local.hex --force
mix deps.get
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# storex

## 0.4.0

- **[BREAKING]** `Storex.mutate/3` is no longer based on `session_id`
- **[BREAKING]** `Store.init/2` callback now need to return `{:ok, state} | {:ok, state, key} | {:error, reason}`
- **[BREAKING]** Remove custom `Registry` logic
- **[BREAKING]** Remove `connection` callback from javascript client
- New registry mechanism provides distributed mutations across the cluster
- Fix `terminate` callback in `Storex.Handler.Plug`
- Added three callbacks to frontend client `onConnected`, `onError` and `onDisconnected`

## 0.3.0

- **[BREAKING]** Rename Cowbow handler module from `Storex.Socket.Handler` to `Storex.Handler.Cowboy`
Expand Down
36 changes: 20 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,20 @@ end

### Create store

To create a store you need to create new elixir module with `init/2` which is called when a page is loaded, every time websocket is connected it generates session_id and passes it as the first argument, params are from Javascript store declaration. Next, you can declare `mutation/5` where the first argument is mutation name, second is data passed to mutation, next two params are same like in `init/2`, the last one is the current state of the store.
To create a store you need to create new elixir module with `init/2` which is called when a page is loaded, every time websocket is connected it generates session_id and passes it as the first argument, params are from Javascript store declaration. `init/2` callback need to return one of this tuples:

- `{:ok, state}` - for initial state
- `{:ok, state, key}` - for initial state with `key` which can be used as selector for future mutations
- `{:error, reason}` - to send error message to frontend on initialization

Next, you can declare `mutation/5` where the first argument is mutation name, second is data passed to mutation, next two params are same like in `init/2`, the last one is the current state of the store.

```elixir
defmodule ExampleApp.Store.Counter do
use Storex.Store

def init(session_id, params) do
0
{:ok, 0}
end

# `increase` is mutation name, `data` is payload from front-end, `session_id` is current session id of connecton, `initial_params` with which store was initialized, `state` is store current state.
Expand Down Expand Up @@ -98,8 +104,14 @@ const store = new Storex({
subscribe: (state) => {
console.log(state)
},
connection: (state) => {
console.log(state ? 'connected' : 'disconnected')
onConnected() {
console.log('connected')
},
onError(error) {
console.log('error', error)
},
onDisconnected(closeEvent) {
console.log('disconnected', closeEvent)
}
})
```
Expand All @@ -119,8 +131,10 @@ store.commit("set", 10)
Or directly from elixir:

```elixir
Storex.mutate(session_id, store, "increase")
Storex.mutate(session_id, store, "set", [10])
Storex.mutate(store, "increase", [])
Storex.mutate(store, "set", [10])
Storex.mutate(key, store, "increase", [])
Storex.mutate(key, store, "set", [10])
```

### Subscribe to store state changes
Expand All @@ -133,16 +147,6 @@ store.subscribe((state) => {
})
```

### Subscribe to store connection

You can subscribe to store connection state changes in javascript with function connection:

```javascript
store.connection((state) => {
console.log(state ? 'connected' : 'disconnected')
})
```

## Configuration

### Session id generation library
Expand Down
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ import Config
config :storex,
session_id_library: Nanoid,
registry: Storex.Registry.ETS

if Mix.env() == :test do
import_config "test.exs"
end
46 changes: 33 additions & 13 deletions lib/storex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,48 @@ defmodule Storex do
def start(_type, _args) do
import Supervisor.Spec, warn: false

children = [
{Storex.Registry.ETS, []},
{Storex.Supervisor, []}
]
children =
pg_children() ++
[
{Storex.PG, []},
{Storex.Registry, []},
{Storex.Supervisor, []}
]

Supervisor.start_link(children, strategy: :one_for_one, name: __MODULE__)
end

if Code.ensure_loaded?(:pg) do
defp pg_children() do
[%{id: :pg, start: {:pg, :start_link, [Storex.PG]}}]
end
else
defp pg_children() do
[]
end
end

@doc """
Mutate store from elixir.

Call mutation callback in store synchronously:
Invoke mutation callback globally across specified store asynchronously:
```elixir
Storex.mutate("d9ez7fgkp96", "ExampleApp.Store", "reload", ["user_id"])
Storex.mutate("ExampleApp.Store", "reload", ["params"])
```
"""
def mutate(session, store, mutation, payload \\ [])
when is_binary(session) and is_binary(store) do
Storex.Registry.session_pid(session)
|> case do
:undefined -> {:error, "Session #{session} not found."}
pid -> Kernel.send(pid, {:mutate, store, mutation, payload})
end
def mutate(store, mutation, payload) do
Storex.PG.broadcast({:mutate, store, mutation, payload})
end

@doc """
Mutate store from elixir.

Invoke mutation callback by specified key and store asynchronously:
```elixir
Storex.mutate("user_id", "ExampleApp.Store", "reload", ["params"])
```
"""
def mutate(key, store, mutation, payload) do
Storex.PG.broadcast({:mutate, key, store, mutation, payload})
end
end
6 changes: 1 addition & 5 deletions lib/storex/handler/cowboy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ defmodule Storex.Handler.Cowboy do
def init(request, _state) do
session = Application.get_env(:storex, :session_id_library, Nanoid).generate()

Storex.Registry.register_session(session, request.pid)

{:cowboy_websocket, request, %{session: session, pid: request.pid}}
end

Expand All @@ -17,12 +15,10 @@ defmodule Storex.Handler.Cowboy do

def terminate(_reason, _req, %{session: session}) do
Storex.Registry.session_stores(session)
|> Enum.each(fn {session, store, _} ->
|> Enum.each(fn {store, _, session, _, _} ->
Storex.Supervisor.remove_store(session, store)
end)

Storex.Registry.unregister_session(session)

:ok
end

Expand Down
6 changes: 1 addition & 5 deletions lib/storex/handler/plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,15 @@ defmodule Storex.Handler.Plug do
session = Application.get_env(:storex, :session_id_library, Nanoid).generate()
pid = self()

Storex.Registry.register_session(session, pid)

{:ok, %{session: session, pid: pid}}
end

def terminate(_reason, %{session: session}) do
Storex.Registry.session_stores(session)
|> Enum.each(fn {session, store, _} ->
|> Enum.each(fn {store, _, session, _, _} ->
Storex.Supervisor.remove_store(session, store)
end)

Storex.Registry.unregister_session(session)

:ok
end

Expand Down
77 changes: 77 additions & 0 deletions lib/storex/pg.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
defmodule Storex.PG do
@moduledoc false
use GenServer

@name :storex_pg

def start_link(_) do
GenServer.start_link(__MODULE__, [], name: @name)
end

@impl true
def init(_) do
:ok = pg_join(@name)
{:ok, @name}
end

def broadcast(payload) do
pg_members(@name)
|> case do
{:error, _} ->
:error

pids ->
for pid <- pids do
send(pid, {:broadcast, payload})
end
end
end

@impl true
def handle_info({:broadcast, {:mutate, store, mutation, payload}}, state) do
Storex.Registry.get_store_instances({store, :_, :_, :_, :_})
|> Enum.map(fn {^store, _, _, session_pid, _} ->
Kernel.send(session_pid, {:mutate, store, mutation, payload})
end)

{:noreply, state}
end

@impl true
def handle_info({:broadcast, {:mutate, key, store, mutation, payload}}, state) do
Storex.Registry.get_store_instances({store, :_, :_, :_, key})
|> Enum.each(fn {^store, _, _, session_pid, ^key} ->
Kernel.send(session_pid, {:mutate, store, mutation, payload})
end)

{:noreply, state}
end

@impl true
def handle_info(_, state) do
{:noreply, state}
end

if Code.ensure_loaded?(:pg) do
defp pg_members(group) do
:pg.get_members(Storex.PG, group)
end
else
defp pg_members(group) do
:pg2.get_members({:storex, group})
end
end

if Code.ensure_loaded?(:pg) do
defp pg_join(group) do
:ok = :pg.join(Storex.PG, group, self())
end
else
defp pg_join(group) do
namespace = {:storex, group}
:ok = :pg2.create(namespace)
:ok = :pg2.join(namespace, self())
:ok
end
end
end
Loading
Loading