Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle subscription update for eKuiper and MQTT plugin #1439

Merged
merged 3 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/ekuiper/plugin_ekuiper.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ static int ekuiper_plugin_request(neu_plugin_t * plugin,
send_data(plugin, trans_data);
break;
}
case NEU_REQ_SUBSCRIBE_GROUP: {
case NEU_REQ_SUBSCRIBE_GROUP:
case NEU_REQ_UPDATE_SUBSCRIBE_GROUP: {
neu_req_subscribe_t *sub_info = data;
free(sub_info->params);
break;
Expand Down
39 changes: 36 additions & 3 deletions plugins/mqtt/mqtt_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ int handle_subscribe_group(neu_plugin_t *plugin, neu_req_subscribe_t *sub_info)
// no parameters, try default topic
topic.v.val_str = default_upload_topic(sub_info);
if (NULL == topic.v.val_str) {
rv = NEU_ERR_EINTERNAL;
goto end;
}
} else if (0 != neu_parse_param(sub_info->params, NULL, 1, &topic)) {
Expand All @@ -599,9 +600,41 @@ int handle_subscribe_group(neu_plugin_t *plugin, neu_req_subscribe_t *sub_info)
sub_info->group, topic.v.val_str);
// topic.v.val_str ownership moved
if (0 != rv) {
plog_error(plugin, "route driver:%s group:%s to topic:%s fail, `%s`",
sub_info->driver, sub_info->group, topic.v.val_str,
sub_info->params);
plog_error(plugin, "route driver:%s group:%s fail, `%s`",
sub_info->driver, sub_info->group, sub_info->params);
goto end;
}

plog_notice(plugin, "route driver:%s group:%s to topic:%s",
sub_info->driver, sub_info->group, topic.v.val_str);

end:
free(sub_info->params);
return rv;
}

int handle_update_subscribe(neu_plugin_t *plugin, neu_req_subscribe_t *sub_info)
{
int rv = 0;

if (NULL == sub_info->params) {
rv = NEU_ERR_GROUP_PARAMETER_INVALID;
goto end;
}

neu_json_elem_t topic = { .name = "topic", .t = NEU_JSON_STR };
if (0 != neu_parse_param(sub_info->params, NULL, 1, &topic)) {
plog_error(plugin, "parse `%s` for topic fail", sub_info->params);
rv = NEU_ERR_GROUP_PARAMETER_INVALID;
goto end;
}

rv = route_tbl_update(&plugin->route_tbl, sub_info->driver, sub_info->group,
topic.v.val_str);
// topic.v.val_str ownership moved
if (0 != rv) {
plog_error(plugin, "route driver:%s group:%s fail, `%s`",
sub_info->driver, sub_info->group, sub_info->params);
goto end;
}

Expand Down
2 changes: 2 additions & 0 deletions plugins/mqtt/mqtt_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ int handle_trans_data(neu_plugin_t * plugin,
neu_reqresp_trans_data_t *trans_data);

int handle_subscribe_group(neu_plugin_t *plugin, neu_req_subscribe_t *sub_info);
int handle_update_subscribe(neu_plugin_t * plugin,
neu_req_subscribe_t *sub_info);
int handle_unsubscribe_group(neu_plugin_t * plugin,
neu_req_unsubscribe_t *unsub_info);

Expand Down
3 changes: 3 additions & 0 deletions plugins/mqtt/mqtt_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ static int mqtt_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
case NEU_REQ_SUBSCRIBE_GROUP:
error = handle_subscribe_group(plugin, data);
break;
case NEU_REQ_UPDATE_SUBSCRIBE_GROUP:
error = handle_update_subscribe(plugin, data);
break;
case NEU_REQ_UNSUBSCRIBE_GROUP:
error = handle_unsubscribe_group(plugin, data);
break;
Expand Down
18 changes: 18 additions & 0 deletions plugins/mqtt/mqtt_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,24 @@ static inline int route_tbl_add_new(route_entry_t **tbl, const char *driver,
return 0;
}

// NOTE: we take ownership of `topic`
static inline int route_tbl_update(route_entry_t **tbl, const char *driver,
const char *group, char *topic)
{
route_entry_t *find = NULL;

find = route_tbl_get(tbl, driver, group);
if (NULL == find) {
free(topic);
return NEU_ERR_GROUP_NOT_SUBSCRIBE;
}

free(find->topic);
find->topic = topic;

return 0;
}

static inline void route_tbl_del(route_entry_t **tbl, const char *driver,
const char *group)
{
Expand Down
4 changes: 2 additions & 2 deletions src/core/subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ int neu_subscribe_manager_update_params(neu_subscribe_mgr_t *mgr,
return NEU_ERR_GROUP_NOT_SUBSCRIBE;
}

char *p = strdup(params);
if (NULL == p) {
char *p = NULL;
if (params && NULL == (p = strdup(params))) {
return NEU_ERR_EINTERNAL;
}

Expand Down
Loading