Skip to content

Commit

Permalink
Merge pull request #2253 from ZhouBox/v2.10
Browse files Browse the repository at this point in the history
[v2.10]otel:add api and use new api
  • Loading branch information
fengzeroz authored Oct 12, 2024
2 parents a02390b + 32b7424 commit b24d350
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 41 deletions.
3 changes: 3 additions & 0 deletions include/neuron/otel/otel_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ neu_otel_trace_ctx neu_otel_find_trace_by_id(const char *trace_id);
void neu_otel_free_trace(neu_otel_trace_ctx ctx);

neu_otel_scope_ctx neu_otel_add_span(neu_otel_trace_ctx ctx);
neu_otel_scope_ctx neu_otel_add_span2(neu_otel_trace_ctx ctx,
const char * span_name,
const char * span_id);

void neu_otel_scope_set_parent_span_id(neu_otel_scope_ctx ctx,
const char * parent_span_id);
Expand Down
11 changes: 2 additions & 9 deletions plugins/ekuiper/plugin_ekuiper.c
Original file line number Diff line number Diff line change
Expand Up @@ -367,17 +367,10 @@ static int ekuiper_plugin_request(neu_plugin_t * plugin,
if (neu_otel_data_is_started() && trans_data->trace_ctx) {
trans_trace = neu_otel_find_trace(trans_data->trace_ctx);
if (trans_trace) {
trans_scope = neu_otel_add_span(trans_trace);
neu_otel_scope_set_span_name(trans_scope, "ekuiper send");
char new_span_id[36] = { 0 };
neu_otel_new_span_id(new_span_id);
neu_otel_scope_set_span_id(trans_scope, new_span_id);
uint8_t *p_sp_id =
neu_otel_scope_get_pre_span_id(trans_scope);
if (p_sp_id) {
neu_otel_scope_set_parent_span_id2(trans_scope, p_sp_id,
8);
}
trans_scope = neu_otel_add_span2(
trans_trace, "ekuiper send", new_span_id);
neu_otel_scope_add_span_attr_int(trans_scope, "thread id",
(int64_t)(pthread_self()));
neu_otel_scope_add_span_attr_int(
Expand Down
13 changes: 3 additions & 10 deletions plugins/ekuiper/read_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,9 @@ void send_data(neu_plugin_t *plugin, neu_reqresp_trans_data_t *trans_data)
if (neu_otel_data_is_started() && trans_data->trace_ctx) {
trans_trace = neu_otel_find_trace(trans_data->trace_ctx);
if (trans_trace) {
trans_scope = neu_otel_add_span(trans_trace);
neu_otel_scope_set_span_name(trans_scope, "ekuiper send");
neu_otel_new_span_id(new_span_id);
neu_otel_scope_set_span_id(trans_scope, new_span_id);
uint8_t *p_sp_id = neu_otel_scope_get_pre_span_id(trans_scope);
if (p_sp_id) {
neu_otel_scope_set_parent_span_id2(trans_scope, p_sp_id, 8);
}
trans_scope =
neu_otel_add_span2(trans_trace, "ekuiper send", new_span_id);
neu_otel_scope_add_span_attr_int(trans_scope, "thread id",
(int64_t)(pthread_self()));
neu_otel_scope_set_span_start_time(trans_scope, neu_time_ms());
Expand Down Expand Up @@ -137,8 +132,6 @@ void send_data(neu_plugin_t *plugin, neu_reqresp_trans_data_t *trans_data)
}

if (trans_trace) {
neu_otel_scope_add_span_attr_string(trans_scope, "playload",
json_str);
trace_id = neu_otel_get_trace_id(trans_trace);
uint8_t span_id[8] = { 0 };
hex_string_to_binary(new_span_id, span_id, 8);
Expand Down Expand Up @@ -172,7 +165,7 @@ void send_data(neu_plugin_t *plugin, neu_reqresp_trans_data_t *trans_data)
} while (0);

if (trans_trace) {
neu_otel_scope_add_span_attr_int(trans_scope, "error", rv);
neu_otel_scope_add_span_attr_int(trans_scope, "rv", rv);
neu_otel_scope_set_span_end_time(trans_scope, neu_time_ms());
neu_otel_trace_set_final(trans_trace);
}
Expand Down
17 changes: 3 additions & 14 deletions plugins/mqtt/mqtt_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -607,15 +607,10 @@ int handle_trans_data(neu_plugin_t * plugin,
if (neu_otel_data_is_started() && trans_data->trace_ctx) {
trans_trace = neu_otel_find_trace(trans_data->trace_ctx);
if (trans_trace) {
trans_scope = neu_otel_add_span(trans_trace);
neu_otel_scope_set_span_name(trans_scope, "mqtt publish");
char new_span_id[36] = { 0 };
neu_otel_new_span_id(new_span_id);
neu_otel_scope_set_span_id(trans_scope, new_span_id);
uint8_t *p_sp_id = neu_otel_scope_get_pre_span_id(trans_scope);
if (p_sp_id) {
neu_otel_scope_set_parent_span_id2(trans_scope, p_sp_id, 8);
}
trans_scope =
neu_otel_add_span2(trans_trace, "mqtt publish", new_span_id);
neu_otel_scope_add_span_attr_int(trans_scope, "thread id",
(int64_t)(pthread_self()));
neu_otel_scope_set_span_start_time(trans_scope, neu_time_ms());
Expand Down Expand Up @@ -661,23 +656,17 @@ int handle_trans_data(neu_plugin_t * plugin,
neu_mqtt_qos_e qos = plugin->config.qos;

if (plugin->config.version == NEU_MQTT_VERSION_V5 && trans_trace) {
neu_otel_scope_add_span_attr_string(trans_scope, "playload",
json_str);
rv = publish_with_trace(plugin, qos, topic, json_str,
strlen(json_str), trace_parent);
} else {
if (trans_trace) {
neu_otel_scope_add_span_attr_string(trans_scope, "playload",
json_str);
}
rv = publish(plugin, qos, topic, json_str, strlen(json_str));
}

json_str = NULL;
} while (0);

if (trans_trace) {
neu_otel_scope_add_span_attr_int(trans_scope, "error", rv);
neu_otel_scope_add_span_attr_int(trans_scope, "rv", rv);
neu_otel_scope_set_span_end_time(trans_scope, neu_time_ms());
neu_otel_trace_set_final(trans_trace);
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/mqtt/mqtt_plugin_intf.c
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ int mqtt_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
}

if (trace) {
neu_otel_scope_add_span_attr_int(scope, "handle error", error);
neu_otel_scope_add_span_attr_int(scope, "rv", error);
neu_otel_scope_set_span_end_time(scope, neu_time_ms());
neu_otel_trace_set_final(trace);
}
Expand Down
9 changes: 2 additions & 7 deletions src/adapter/driver/driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -1987,15 +1987,10 @@ static int report_callback(void *usr_data)
if (neu_otel_data_is_started() && data->trace_ctx) {
trans_trace = neu_otel_find_trace(data->trace_ctx);
if (trans_trace) {
trans_scope = neu_otel_add_span(trans_trace);
neu_otel_scope_set_span_name(trans_scope, "report cb");
char new_span_id[36] = { 0 };
neu_otel_new_span_id(new_span_id);
neu_otel_scope_set_span_id(trans_scope, new_span_id);
uint8_t *p_sp_id = neu_otel_scope_get_pre_span_id(trans_scope);
if (p_sp_id) {
neu_otel_scope_set_parent_span_id2(trans_scope, p_sp_id, 8);
}
trans_scope =
neu_otel_add_span2(trans_trace, "report cb", new_span_id);
neu_otel_scope_add_span_attr_int(trans_scope, "thread id",
(int64_t)(pthread_self()));
neu_otel_scope_set_span_start_time(trans_scope, neu_time_ms());
Expand Down
81 changes: 81 additions & 0 deletions src/otel/otel_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ static int hex_char_to_int(char c)
static int hex_string_to_binary(const char * hex_string,
unsigned char *binary_array, int max_length)
{
if (hex_string == NULL) {
return -1;
}
int length = strlen(hex_string);
if (length % 2 != 0 || length <= 0)
return -1;
Expand Down Expand Up @@ -601,6 +604,84 @@ neu_otel_scope_ctx neu_otel_add_span(neu_otel_trace_ctx ctx)
return scope;
}

neu_otel_scope_ctx neu_otel_add_span2(neu_otel_trace_ctx ctx,
const char * span_name,
const char * span_id)
{
trace_scope_t *scope = calloc(1, sizeof(trace_scope_t));
trace_ctx_t * trace_ctx = (trace_ctx_t *) ctx;
pthread_mutex_lock(&trace_ctx->mutex);
scope->trace_ctx = ctx;
if (trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans == 0) {
trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans =
calloc(1, sizeof(Opentelemetry__Proto__Trace__V1__Span *));
trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans[0] =
calloc(1, sizeof(Opentelemetry__Proto__Trace__V1__Span));

trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans = 1;
scope->span =
trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans[0];
scope->span_index = 0;
} else {
Opentelemetry__Proto__Trace__V1__Span **t_spans =
trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans;
trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans =
calloc(1 +
trace_ctx->trace_data.resource_spans[0]
->scope_spans[0]
->n_spans,
sizeof(Opentelemetry__Proto__Trace__V1__Span *));
for (size_t i = 0; i <
trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans;
i++) {
trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans[i] =
t_spans[i];
}

free(t_spans);

trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans
[trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans] =
calloc(1, sizeof(Opentelemetry__Proto__Trace__V1__Span));
scope->span = trace_ctx->trace_data.resource_spans[0]
->scope_spans[0]
->spans[trace_ctx->trace_data.resource_spans[0]
->scope_spans[0]
->n_spans];
trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans += 1;

scope->span_index =
trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans -
1;
}

opentelemetry__proto__trace__v1__span__init(scope->span);
scope->span->kind =
OPENTELEMETRY__PROTO__TRACE__V1__SPAN__SPAN_KIND__SPAN_KIND_SERVER;
uint8_t *t_id = calloc(1, 16);
hex_string_to_binary((char *) trace_ctx->trace_id, t_id, 16);
scope->span->trace_id.data = t_id;
scope->span->trace_id.len = 16;
scope->span->flags = trace_ctx->flags;
neu_otel_scope_set_span_name(scope, span_name);
neu_otel_scope_set_span_id(scope, span_id);
if (scope->span_index != 0) {
neu_otel_scope_set_parent_span_id2(
scope,
trace_ctx->trace_data.resource_spans[0]
->scope_spans[0]
->spans[scope->span_index - 1]
->span_id.data,
trace_ctx->trace_data.resource_spans[0]
->scope_spans[0]
->spans[scope->span_index - 1]
->span_id.len);
}
utarray_push_back(trace_ctx->scopes, &scope);
pthread_mutex_unlock(&trace_ctx->mutex);
return scope;
}

void neu_otel_scope_set_parent_span_id(neu_otel_scope_ctx ctx,
const char * parent_span_id)
{
Expand Down

0 comments on commit b24d350

Please sign in to comment.