Skip to content

Commit

Permalink
Merge pull request #2306 from fengzeroz/main
Browse files Browse the repository at this point in the history
mqtt filter tag & driver cmd
  • Loading branch information
fengzeroz authored Nov 11, 2024
2 parents 141b3f5 + b291e49 commit d60e929
Show file tree
Hide file tree
Showing 20 changed files with 348 additions and 27 deletions.
1 change: 1 addition & 0 deletions include/neuron/errcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ typedef enum {
NEU_ERR_PLUGIN_TAG_VALUE_OUT_OF_RANGE = 3020,
NEU_ERR_PLUGIN_NOT_SUPPORT_SCAN_TAGS = 3021,
NEU_ERR_PLUGIN_NOT_SUPPORT_TEST_READ_TAG = 3022,
NEU_ERR_PLUGIN_NOT_SUPPORT_CMD_CALL = 3023,

NEU_ERR_MQTT_FAILURE = 4000,
NEU_ERR_MQTT_NO_CERTFILESET = 4001,
Expand Down
7 changes: 7 additions & 0 deletions include/neuron/json/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ typedef enum neu_json_type {
NEU_JSON_ARRAY_FLOAT,
NEU_JSON_ARRAY_DOUBLE,
NEU_JSON_ARRAY_BOOL,
NEU_JSON_ARRAY_STR,
NEU_JSON_VALUE = NEU_JSON_UNDEFINE
} neu_json_type_e;

Expand Down Expand Up @@ -106,6 +107,11 @@ typedef struct {
uint8_t length;
} neu_json_value_array_bool_t;

typedef struct {
char ** p_strs;
uint16_t length;
} neu_json_value_array_str_t;

typedef union neu_json_value {
int64_t val_int;
uint8_t val_bit;
Expand All @@ -125,6 +131,7 @@ typedef union neu_json_value {
neu_json_value_array_float_t val_array_float;
neu_json_value_array_double_t val_array_double;
neu_json_value_array_bool_t val_array_bool;
neu_json_value_array_str_t val_array_str;
} neu_json_value_u;

typedef enum neu_json_attribute {
Expand Down
41 changes: 41 additions & 0 deletions include/neuron/json/neu_json_driver.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* NEURON IIoT System for Industry 4.0
* Copyright (C) 2020-2022 EMQ Technologies Co., Ltd All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**/

#ifndef _NEU_JSON_API_DRIVER_CMD_H_
#define _NEU_JSON_API_DRIVER_CMD_H_

#include "define.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef struct {
char *driver;
char *cmd;
} neu_json_driver_cmd_t;

int neu_json_decode_driver_cmd_req(char *buf, neu_json_driver_cmd_t **result);
void neu_json_decode_driver_cmd_req_free(neu_json_driver_cmd_t *req);

#ifdef __cplusplus
}
#endif

#endif
12 changes: 7 additions & 5 deletions include/neuron/json/neu_json_rw.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,13 @@ int neu_json_decode_write(char *buf, neu_json_write_t **result);
void neu_json_decode_write_free(neu_json_write_t *req);

typedef struct {
char *group;
char *node;
char *name;
char *desc;
bool sync;
char * group;
char * node;
char * name;
char * desc;
bool sync;
int n_tags;
char **tags;
} neu_json_read_req_t;

int neu_json_decode_read_req(char *buf, neu_json_read_req_t **result);
Expand Down
27 changes: 22 additions & 5 deletions include/neuron/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ typedef enum neu_reqresp_type {

NEU_REQ_CHECK_SCHEMA,
NEU_RESP_CHECK_SCHEMA,

NEU_REQ_DRIVER_CMD,
} neu_reqresp_type_e;

static const char *neu_reqresp_type_string_t[] = {
Expand Down Expand Up @@ -210,6 +212,8 @@ static const char *neu_reqresp_type_string_t[] = {

[NEU_REQ_CHECK_SCHEMA] = "NEU_REQ_CHECK_SCHEMA",
[NEU_RESP_CHECK_SCHEMA] = "NEU_RESP_CHECK_SCHEMA",

[NEU_REQ_DRIVER_CMD] = "NEU_REQ_DRIVER_CMD",
};

inline static const char *neu_reqresp_type_string(neu_reqresp_type_e type)
Expand Down Expand Up @@ -238,6 +242,11 @@ typedef struct neu_resp_error {
int error;
} neu_resp_error_t;

typedef struct neu_req_driver_cmd {
char driver[NEU_NODE_NAME_LEN];
char *cmd;
} neu_req_driver_cmd_t;

typedef struct neu_req_check_schema {
char schema[NEU_PLUGIN_NAME_LEN];
} neu_req_check_schema_t;
Expand Down Expand Up @@ -637,11 +646,13 @@ typedef struct {
} neu_resp_get_nodes_state_t, neu_reqresp_nodes_state_t;

typedef struct neu_req_read_group {
char *driver;
char *group;
char *name;
char *desc;
bool sync;
char * driver;
char * group;
char * name;
char * desc;
bool sync;
uint16_t n_tag;
char ** tags;
} neu_req_read_group_t;

static inline void neu_req_read_group_fini(neu_req_read_group_t *req)
Expand All @@ -650,6 +661,12 @@ static inline void neu_req_read_group_fini(neu_req_read_group_t *req)
free(req->group);
free(req->name);
free(req->desc);
if (req->n_tag > 0) {
for (uint16_t i = 0; i < req->n_tag; i++) {
free(req->tags[i]);
}
free(req->tags);
}
}

typedef struct neu_req_read_group_paginate {
Expand Down
1 change: 1 addition & 0 deletions include/neuron/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ typedef struct neu_plugin_intf_funs {
char *ctx);
int (*test_read_tag)(neu_plugin_t *plugin, void *req,
neu_datatag_t tag);
int (*call)(neu_plugin_t *plugin, const char *cmd);
} driver;
};

Expand Down
55 changes: 53 additions & 2 deletions plugins/mqtt/mqtt_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "otel/otel_manager.h"
#include "utils/asprintf.h"
#include "version.h"
#include "json/neu_json_driver.h"
#include "json/neu_json_mqtt.h"
#include "json/neu_json_rw.h"

Expand Down Expand Up @@ -179,6 +180,26 @@ static char *generate_heartbeat_json(neu_plugin_t *plugin, UT_array *states,
return json_str;
}

static inline int send_driver_cmd(neu_plugin_t * plugin,
neu_json_driver_cmd_t *req)
{
plog_notice(plugin, "driver cmd, driver:%s, cmd:%s", req->driver, req->cmd);
neu_reqresp_head_t header = { 0 };
header.ctx = NULL;

neu_req_driver_cmd_t cmd = { 0 };
strncpy(cmd.driver, req->driver, NEU_NODE_NAME_LEN);
cmd.cmd = strdup(req->cmd);

if (0 != neu_plugin_op(plugin, header, &cmd)) {
free(cmd.cmd);
plog_error(plugin, "neu_plugin_op(NEU_REQ_DRIVER_CMD) fail");
return -1;
}

return 0;
}

static inline int send_read_req(neu_plugin_t *plugin, neu_json_mqtt_t *mqtt,
neu_json_read_req_t *req)
{
Expand All @@ -198,8 +219,14 @@ static inline int send_read_req(neu_plugin_t *plugin, neu_json_mqtt_t *mqtt,
cmd.driver = req->node;
cmd.group = req->group;
cmd.sync = req->sync;
req->node = NULL; // ownership moved
req->group = NULL; // ownership moved
cmd.n_tag = req->n_tags;
cmd.tags = req->tags;

req->node = NULL; // ownership moved
req->group = NULL; // ownership moved
req->n_tags = 0;
req->tags = NULL;

if (0 != neu_plugin_op(plugin, header, &cmd)) {
neu_req_read_group_fini(&cmd);
plog_error(plugin, "neu_plugin_op(NEU_REQ_READ_GROUP) fail");
Expand Down Expand Up @@ -527,6 +554,30 @@ int handle_write_response(neu_plugin_t *plugin, neu_json_mqtt_t *mqtt_json,
return rv;
}

void handle_driver_cmd_req(neu_mqtt_qos_e qos, const char *topic,
const uint8_t *payload, uint32_t len, void *data,
trace_w3c_t *trace_w3c)
{
(void) qos;
(void) topic;
(void) trace_w3c;
neu_plugin_t * plugin = data;
neu_json_driver_cmd_t *req = NULL;

char *json_str = calloc(len + 1, sizeof(char));
memcpy(json_str, payload, len);

int rv = neu_json_decode_driver_cmd_req(json_str, &req);
if (rv != 0) {
plog_error(plugin, "neu_json_decode_driver_cmd_req failed");
free(json_str);
return;
}

neu_json_decode_driver_cmd_req_free(req);
free(json_str);
}

void handle_read_req(neu_mqtt_qos_e qos, const char *topic,
const uint8_t *payload, uint32_t len, void *data,
trace_w3c_t *trace_w3c)
Expand Down
4 changes: 4 additions & 0 deletions plugins/mqtt/mqtt_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ void handle_read_req(neu_mqtt_qos_e qos, const char *topic,
int handle_read_response(neu_plugin_t *plugin, neu_json_mqtt_t *mqtt_json,
neu_resp_read_group_t *data);

void handle_driver_cmd_req(neu_mqtt_qos_e qos, const char *topic,
const uint8_t *payload, uint32_t len, void *data,
trace_w3c_t *trace_w3c);

char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data,
mqtt_upload_format_e format);
int handle_trans_data(neu_plugin_t * plugin,
Expand Down
1 change: 1 addition & 0 deletions plugins/mqtt/mqtt_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct neu_plugin {
char * read_req_topic;
char * read_resp_topic;
char * upload_topic;
char * driver_cmd_topic;
route_entry_t * route_tbl;

int (*parse_config)(neu_plugin_t *plugin, const char *setting,
Expand Down
19 changes: 19 additions & 0 deletions plugins/mqtt/mqtt_plugin_intf.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ int mqtt_plugin_uninit(neu_plugin_t *plugin)
plugin->read_resp_topic = NULL;
free(plugin->upload_topic);
plugin->upload_topic = NULL;
free(plugin->driver_cmd_topic);
plugin->driver_cmd_topic = NULL;

route_tbl_free(plugin->route_tbl);

Expand Down Expand Up @@ -279,6 +281,14 @@ static int create_topic(neu_plugin_t *plugin)
return -1;
}

neu_asprintf(&plugin->driver_cmd_topic, "/neuron/%s/driver/cmd",
plugin->common.name);
if (NULL == plugin->driver_cmd_topic) {
free(plugin->driver_cmd_topic);
plugin->driver_cmd_topic = NULL;
return -1;
}

return 0;
}

Expand Down Expand Up @@ -306,13 +316,22 @@ static int subscribe(neu_plugin_t *plugin, const mqtt_config_t *config)
return NEU_ERR_MQTT_SUBSCRIBE_FAILURE;
}

if (0 !=
neu_mqtt_client_subscribe(plugin->client, config->qos,
plugin->driver_cmd_topic, plugin,
handle_driver_cmd_req)) {
plog_error(plugin, "subscribe [%s] fail", plugin->driver_cmd_topic);
return NEU_ERR_MQTT_SUBSCRIBE_FAILURE;
}

return 0;
}

static int unsubscribe(neu_plugin_t *plugin, const mqtt_config_t *config)
{
neu_mqtt_client_unsubscribe(plugin->client, plugin->read_req_topic);
neu_mqtt_client_unsubscribe(plugin->client, config->write_req_topic);
neu_mqtt_client_unsubscribe(plugin->client, plugin->driver_cmd_topic);
neu_msleep(100); // wait for message completion
return 0;
}
Expand Down
12 changes: 12 additions & 0 deletions src/adapter/adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,11 @@ static int adapter_command(neu_adapter_t *adapter, neu_reqresp_head_t header,

strcpy(pheader->sender, adapter->name);
switch (pheader->type) {
case NEU_REQ_DRIVER_CMD: {
neu_req_driver_cmd_t *cmd = (neu_req_driver_cmd_t *) data;
strcpy(pheader->receiver, cmd->driver);
break;
}
case NEU_REQ_READ_GROUP: {
neu_req_read_group_t *cmd = (neu_req_read_group_t *) data;
strcpy(pheader->receiver, cmd->driver);
Expand Down Expand Up @@ -1399,6 +1404,13 @@ static int adapter_loop(enum neu_event_io_type type, int fd, void *usr_data)
neu_msg_free(msg);
break;
}
case NEU_REQ_DRIVER_CMD: {
neu_req_driver_cmd_t *cmd = (neu_req_driver_cmd_t *) &header[1];
neu_adapter_driver_cmd((neu_adapter_driver_t *) adapter,
(const char *) cmd->cmd);
free(cmd->cmd);
break;
}
default:
nlog_warn("adapter: %s recv msg type error, type: %s", adapter->name,
neu_reqresp_type_string(header->type));
Expand Down
17 changes: 15 additions & 2 deletions src/adapter/driver/driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ void neu_adapter_driver_read_group(neu_adapter_driver_t *driver,

neu_resp_read_group_t resp = { 0 };
neu_group_t * group = g->group;
UT_array *tags = neu_group_query_read_tag(group, cmd->name, cmd->desc);
UT_array *tags = neu_group_query_read_tag(group, cmd->name, cmd->desc,
cmd->n_tag, cmd->tags);

utarray_new(resp.tags, neu_resp_tag_value_meta_icd());

Expand Down Expand Up @@ -651,7 +652,7 @@ void neu_adapter_driver_read_group_paginate(neu_adapter_driver_t *driver,
group, cmd->name, cmd->desc, cmd->current_page, cmd->page_size,
&resp.total_count);
} else {
tags = neu_group_query_read_tag(group, cmd->name, cmd->desc);
tags = neu_group_query_read_tag(group, cmd->name, cmd->desc, 0, NULL);
resp.total_count = utarray_len(tags);
}

Expand Down Expand Up @@ -3020,4 +3021,16 @@ void neu_adapter_driver_test_read_tag(neu_adapter_driver_t *driver,
if (g != NULL && neu_group_tag_size(g->group) > 0) {
start_group_timer(driver, g);
}
}

int neu_adapter_driver_cmd(neu_adapter_driver_t *driver, const char *cmd)
{
if (driver->adapter.state != NEU_NODE_RUNNING_STATE_RUNNING) {
return NEU_ERR_PLUGIN_NOT_RUNNING;
}
if (driver->adapter.module->intf_funs->driver.call == NULL) {
return NEU_ERR_PLUGIN_NOT_SUPPORT_CMD_CALL;
}
return driver->adapter.module->intf_funs->driver.call(
driver->adapter.plugin, cmd);
}
3 changes: 3 additions & 0 deletions src/adapter/driver/driver_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,7 @@ void neu_adapter_driver_unsubscribe(neu_adapter_driver_t * driver,

void neu_adapter_driver_scan_tags(neu_adapter_driver_t *driver,
neu_reqresp_head_t * req);

int neu_adapter_driver_cmd(neu_adapter_driver_t *driver, const char *cmd);

#endif
Loading

0 comments on commit d60e929

Please sign in to comment.