Skip to content

Commit

Permalink
feat(core): fulfill update subscription params requests
Browse files Browse the repository at this point in the history
  • Loading branch information
eeff committed Aug 2, 2023
1 parent 5f0e698 commit 1b3d383
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 0 deletions.
13 changes: 13 additions & 0 deletions include/neuron/persist/persist.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,19 @@ int neu_persister_store_subscription(const char *app_name,
const char *group_name,
const char *params);

/**
* Update subscriptions.
* @param app_name name of the app node
* @param driver_name name of the driver node
* @param group_name name of the group
* @param params subscription params
* @return 0 on success, non-zero otherwise
*/
int neu_persister_update_subscription(const char *app_name,
const char *driver_name,
const char *group_name,
const char *params);

/**
* Load adapter subscriptions.
* @param app_name name of the app node
Expand Down
20 changes: 20 additions & 0 deletions src/core/manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,26 @@ static int manager_loop(enum neu_event_io_type type, int fd, void *usr_data)
reply(manager, header, &error);
break;
}
case NEU_REQ_UPDATE_SUBSCRIBE_GROUP: {
neu_req_subscribe_t *cmd = (neu_req_subscribe_t *) &header[1];
neu_resp_error_t error = { 0 };

error.error = neu_manager_update_subscribe(
manager, cmd->app, cmd->driver, cmd->group, cmd->params);

if (error.error == NEU_ERR_SUCCESS) {
forward_msg(manager, msg, cmd->app);
manager_storage_update_subscribe(manager, cmd->app, cmd->driver,
cmd->group, cmd->params);
} else {
free(cmd->params);
}

header->type = NEU_RESP_ERROR;
strcpy(header->receiver, header->sender);
reply(manager, header, &error);
break;
}
case NEU_REQ_UNSUBSCRIBE_GROUP: {
neu_req_unsubscribe_t *cmd = (neu_req_unsubscribe_t *) &header[1];
neu_resp_error_t error = { 0 };
Expand Down
8 changes: 8 additions & 0 deletions src/core/manager_internal.c
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,14 @@ int neu_manager_subscribe(neu_manager_t *manager, const char *app,
return manager_subscribe(manager, app, driver, group, params);
}

int neu_manager_update_subscribe(neu_manager_t *manager, const char *app,
const char *driver, const char *group,
const char *params)
{
return neu_subscribe_manager_update_params(manager->subscribe_manager, app,
driver, group, params);
}

int neu_manager_send_subscribe(neu_manager_t *manager, const char *app,
const char *driver, const char *group,
const char *params)
Expand Down
3 changes: 3 additions & 0 deletions src/core/manager_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ UT_array *neu_manager_get_driver_group(neu_manager_t *manager);
int neu_manager_subscribe(neu_manager_t *manager, const char *app,
const char *driver, const char *group,
const char *params);
int neu_manager_update_subscribe(neu_manager_t *manager, const char *app,
const char *driver, const char *group,
const char *params);
int neu_manager_send_subscribe(neu_manager_t *manager, const char *app,
const char *driver, const char *group,
const char *params);
Expand Down
12 changes: 12 additions & 0 deletions src/core/storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ void manager_storage_subscribe(neu_manager_t *manager, const char *app,
}
}

void manager_storage_update_subscribe(neu_manager_t *manager, const char *app,
const char *driver, const char *group,
const char *params)
{
(void) manager;
int rv = neu_persister_update_subscription(app, driver, group, params);
if (0 != rv) {
nlog_error("fail update subscription app:%s driver:%s group:%s", app,
driver, group);
}
}

void manager_storage_unsubscribe(neu_manager_t *manager, const char *app,
const char *driver, const char *group)
{
Expand Down
3 changes: 3 additions & 0 deletions src/core/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ void manager_storage_update_node(neu_manager_t *manager, const char *node,
void manager_storage_subscribe(neu_manager_t *manager, const char *app,
const char *driver, const char *group,
const char *params);
void manager_storage_update_subscribe(neu_manager_t *manager, const char *app,
const char *driver, const char *group,
const char *params);
void manager_storage_unsubscribe(neu_manager_t *manager, const char *app,
const char *driver, const char *group);
void manager_storage_add_ndriver_map(neu_manager_t *manager,
Expand Down
38 changes: 38 additions & 0 deletions src/core/subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,44 @@ int neu_subscribe_manager_sub(neu_subscribe_mgr_t *mgr, const char *driver,
return NEU_ERR_SUCCESS;
}

int neu_subscribe_manager_update_params(neu_subscribe_mgr_t *mgr,
const char *app, const char *driver,
const char *group, const char *params)
{
sub_elem_key_t key = { 0 };
strncpy(key.driver, driver, sizeof(key.driver));
strncpy(key.group, group, sizeof(key.group));

sub_elem_t *find = NULL;
HASH_FIND(hh, mgr->ss, &key, sizeof(sub_elem_key_t), find);

if (NULL == find) {
return NEU_ERR_GROUP_NOT_SUBSCRIBE;
}

neu_app_subscribe_t *app_sub = NULL;
utarray_foreach(find->apps, neu_app_subscribe_t *, sub)
{
if (strcmp(sub->app_name, app) == 0) {
app_sub = sub;
break;
}
}

if (NULL == app_sub) {
return NEU_ERR_GROUP_NOT_SUBSCRIBE;
}

char *p = strdup(params);
if (NULL == p) {
return NEU_ERR_EINTERNAL;
}

free(app_sub->params);
app_sub->params = p;
return NEU_ERR_SUCCESS;
}

int neu_subscribe_manager_unsub(neu_subscribe_mgr_t *mgr, const char *driver,
const char *app, const char *group)
{
Expand Down
3 changes: 3 additions & 0 deletions src/core/subscribe.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,8 @@ int neu_subscribe_manager_update_app_name(neu_subscribe_mgr_t *mgr,
int neu_subscribe_manager_update_driver_name(neu_subscribe_mgr_t *mgr,
const char * driver,
const char * new_name);
int neu_subscribe_manager_update_params(neu_subscribe_mgr_t *mgr,
const char *app, const char *driver,
const char *group, const char *params);

#endif
11 changes: 11 additions & 0 deletions src/persist/persist.c
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,17 @@ int neu_persister_store_subscription(const char *app_name,
app_name, driver_name, group_name, params);
}

int neu_persister_update_subscription(const char *app_name,
const char *driver_name,
const char *group_name,
const char *params)
{
return execute_sql(global_db,
"UPDATE subscriptions SET params=%Q "
"WHERE app_name=%Q AND driver_name=%Q AND group_name=%Q",
params, app_name, driver_name, group_name);
}

static UT_icd subscription_info_icd = {
sizeof(neu_persist_subscription_info_t),
NULL,
Expand Down

0 comments on commit 1b3d383

Please sign in to comment.