Skip to content

Commit

Permalink
feat: support appending batch messages using hstreamdb_producer:appen…
Browse files Browse the repository at this point in the history
…d/2 (#74)

* feat: support appending batch messages using hstreamdb_producer:append/2

* fix: fix the appup file
  • Loading branch information
terry-xiaoyu authored Jul 5, 2023
1 parent bb962af commit 0136b22
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/hstreamdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ append(Producer, PartitioningKey, PayloadType, Payload) ->
Record = to_record(PartitioningKey, PayloadType, Payload),
append(Producer, Record).

append(Producer, Record) ->
hstreamdb_producer:append(Producer, Record).
append(Producer, Data) ->
hstreamdb_producer:append(Producer, Data).

flush(Producer) ->
hstreamdb_producer:flush(Producer).
Expand Down
10 changes: 4 additions & 6 deletions src/hstreamdb_erl.appup.src
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
%% -*- mode: erlang -*-
{VSN,
[{"0.2.4",
[{"0.3.1+v0.13.0",
[{load_module,hstreamdb,brutal_purge,soft_purge,[]},
{load_module,hstreamdb_channel_mgr,brutal_purge,soft_purge,[]},
{load_module,hstreamdb_producer,brutal_purge,soft_purge,[]}
]},
{<<".*">>,[]}],
[{"0.2.4",
{<<".*">>,[{restart_application,hstreamdb_erl}]}],
[{"0.3.1+v0.13.0",
[{load_module,hstreamdb,brutal_purge,soft_purge,[]},
{load_module,hstreamdb_channel_mgr,brutal_purge,soft_purge,[]},
{load_module,hstreamdb_producer,brutal_purge,soft_purge,[]}
]},
{<<".*">>,[]}]
{<<".*">>,[{restart_application,hstreamdb_erl}]}]
}.
18 changes: 12 additions & 6 deletions src/hstreamdb_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ stop(Producer) ->
ok = ecpool:stop_sup_pool(Producer),
ok = ecpool:stop_sup_pool(writer_name(Producer)).

append(Producer, Record) ->
append(Producer, Records) ->
ecpool:with_client(
Producer,
fun(Pid) ->
gen_server:call(Pid, {append, Record})
gen_server:call(Pid, {append, Records})
end).

flush(Producer) ->
Expand Down Expand Up @@ -137,11 +137,17 @@ init(Options) ->
current_batches = #{}
}}.

handle_call({append, Record}, _From, State) ->
handle_call({append, Records}, _From, State) ->
if_not_overflooded(
fun() ->
NState = do_append(Record, State),
{reply, ok, NState}
case is_list(Records) of
false ->
{reply, ok, do_append(Records, State)};
true ->
{reply, ok, lists:foldl(fun(Record, AccState) ->
do_append(Record, AccState)
end, State, Records)}
end
end,
State);

Expand Down Expand Up @@ -213,7 +219,7 @@ do_append({PartitioningKey, Record},
NRecords = [Record],
NRecordMap = RecordMap#{ShardId => NRecords},
NFlushDeadlineMap = FlushDeadlineMap#{
ShardId =>
ShardId =>
Interval + erlang:monotonic_time(millisecond)
},

Expand Down

0 comments on commit 0136b22

Please sign in to comment.