From 0136b22203b40a06508a711b9e15069a32d2e7ae Mon Sep 17 00:00:00 2001 From: Xinyu Liu <506895667@qq.com> Date: Wed, 5 Jul 2023 12:47:20 +0800 Subject: [PATCH] feat: support appending batch messages using hstreamdb_producer:append/2 (#74) * feat: support appending batch messages using hstreamdb_producer:append/2 * fix: fix the appup file --- src/hstreamdb.erl | 4 ++-- src/hstreamdb_erl.appup.src | 10 ++++------ src/hstreamdb_producer.erl | 18 ++++++++++++------ 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/hstreamdb.erl b/src/hstreamdb.erl index de849fc..1c01791 100644 --- a/src/hstreamdb.erl +++ b/src/hstreamdb.erl @@ -116,8 +116,8 @@ append(Producer, PartitioningKey, PayloadType, Payload) -> Record = to_record(PartitioningKey, PayloadType, Payload), append(Producer, Record). -append(Producer, Record) -> - hstreamdb_producer:append(Producer, Record). +append(Producer, Data) -> + hstreamdb_producer:append(Producer, Data). flush(Producer) -> hstreamdb_producer:flush(Producer). diff --git a/src/hstreamdb_erl.appup.src b/src/hstreamdb_erl.appup.src index 9be0636..4f63f96 100644 --- a/src/hstreamdb_erl.appup.src +++ b/src/hstreamdb_erl.appup.src @@ -1,15 +1,13 @@ %% -*- mode: erlang -*- {VSN, - [{"0.2.4", + [{"0.3.1+v0.13.0", [{load_module,hstreamdb,brutal_purge,soft_purge,[]}, - {load_module,hstreamdb_channel_mgr,brutal_purge,soft_purge,[]}, {load_module,hstreamdb_producer,brutal_purge,soft_purge,[]} ]}, - {<<".*">>,[]}], - [{"0.2.4", + {<<".*">>,[{restart_application,hstreamdb_erl}]}], + [{"0.3.1+v0.13.0", [{load_module,hstreamdb,brutal_purge,soft_purge,[]}, - {load_module,hstreamdb_channel_mgr,brutal_purge,soft_purge,[]}, {load_module,hstreamdb_producer,brutal_purge,soft_purge,[]} ]}, - {<<".*">>,[]}] + {<<".*">>,[{restart_application,hstreamdb_erl}]}] }. diff --git a/src/hstreamdb_producer.erl b/src/hstreamdb_producer.erl index b15193c..8f649c9 100644 --- a/src/hstreamdb_producer.erl +++ b/src/hstreamdb_producer.erl @@ -81,11 +81,11 @@ stop(Producer) -> ok = ecpool:stop_sup_pool(Producer), ok = ecpool:stop_sup_pool(writer_name(Producer)). -append(Producer, Record) -> +append(Producer, Records) -> ecpool:with_client( Producer, fun(Pid) -> - gen_server:call(Pid, {append, Record}) + gen_server:call(Pid, {append, Records}) end). flush(Producer) -> @@ -137,11 +137,17 @@ init(Options) -> current_batches = #{} }}. -handle_call({append, Record}, _From, State) -> +handle_call({append, Records}, _From, State) -> if_not_overflooded( fun() -> - NState = do_append(Record, State), - {reply, ok, NState} + case is_list(Records) of + false -> + {reply, ok, do_append(Records, State)}; + true -> + {reply, ok, lists:foldl(fun(Record, AccState) -> + do_append(Record, AccState) + end, State, Records)} + end end, State); @@ -213,7 +219,7 @@ do_append({PartitioningKey, Record}, NRecords = [Record], NRecordMap = RecordMap#{ShardId => NRecords}, NFlushDeadlineMap = FlushDeadlineMap#{ - ShardId => + ShardId => Interval + erlang:monotonic_time(millisecond) },