From 32b74243339b5c9c60175a7f87a92518cc831f52 Mon Sep 17 00:00:00 2001 From: "xiang.zhou" Date: Sat, 12 Oct 2024 12:59:01 +0800 Subject: [PATCH] otel:add api and use new api --- include/neuron/otel/otel_manager.h | 3 ++ plugins/ekuiper/plugin_ekuiper.c | 11 +--- plugins/ekuiper/read_write.c | 13 ++--- plugins/mqtt/mqtt_handle.c | 17 ++----- plugins/mqtt/mqtt_plugin_intf.c | 2 +- src/adapter/driver/driver.c | 9 +--- src/otel/otel_manager.c | 81 ++++++++++++++++++++++++++++++ 7 files changed, 95 insertions(+), 41 deletions(-) diff --git a/include/neuron/otel/otel_manager.h b/include/neuron/otel/otel_manager.h index c8b2b0e31..7976e058f 100644 --- a/include/neuron/otel/otel_manager.h +++ b/include/neuron/otel/otel_manager.h @@ -42,6 +42,9 @@ neu_otel_trace_ctx neu_otel_find_trace_by_id(const char *trace_id); void neu_otel_free_trace(neu_otel_trace_ctx ctx); neu_otel_scope_ctx neu_otel_add_span(neu_otel_trace_ctx ctx); +neu_otel_scope_ctx neu_otel_add_span2(neu_otel_trace_ctx ctx, + const char * span_name, + const char * span_id); void neu_otel_scope_set_parent_span_id(neu_otel_scope_ctx ctx, const char * parent_span_id); diff --git a/plugins/ekuiper/plugin_ekuiper.c b/plugins/ekuiper/plugin_ekuiper.c index 3a756cb1a..c00fae3eb 100644 --- a/plugins/ekuiper/plugin_ekuiper.c +++ b/plugins/ekuiper/plugin_ekuiper.c @@ -367,17 +367,10 @@ static int ekuiper_plugin_request(neu_plugin_t * plugin, if (neu_otel_data_is_started() && trans_data->trace_ctx) { trans_trace = neu_otel_find_trace(trans_data->trace_ctx); if (trans_trace) { - trans_scope = neu_otel_add_span(trans_trace); - neu_otel_scope_set_span_name(trans_scope, "ekuiper send"); char new_span_id[36] = { 0 }; neu_otel_new_span_id(new_span_id); - neu_otel_scope_set_span_id(trans_scope, new_span_id); - uint8_t *p_sp_id = - neu_otel_scope_get_pre_span_id(trans_scope); - if (p_sp_id) { - neu_otel_scope_set_parent_span_id2(trans_scope, p_sp_id, - 8); - } + trans_scope = neu_otel_add_span2( + trans_trace, "ekuiper send", new_span_id); neu_otel_scope_add_span_attr_int(trans_scope, "thread id", (int64_t)(pthread_self())); neu_otel_scope_add_span_attr_int( diff --git a/plugins/ekuiper/read_write.c b/plugins/ekuiper/read_write.c index 08e5aae3a..379a87a8f 100644 --- a/plugins/ekuiper/read_write.c +++ b/plugins/ekuiper/read_write.c @@ -100,14 +100,9 @@ void send_data(neu_plugin_t *plugin, neu_reqresp_trans_data_t *trans_data) if (neu_otel_data_is_started() && trans_data->trace_ctx) { trans_trace = neu_otel_find_trace(trans_data->trace_ctx); if (trans_trace) { - trans_scope = neu_otel_add_span(trans_trace); - neu_otel_scope_set_span_name(trans_scope, "ekuiper send"); neu_otel_new_span_id(new_span_id); - neu_otel_scope_set_span_id(trans_scope, new_span_id); - uint8_t *p_sp_id = neu_otel_scope_get_pre_span_id(trans_scope); - if (p_sp_id) { - neu_otel_scope_set_parent_span_id2(trans_scope, p_sp_id, 8); - } + trans_scope = + neu_otel_add_span2(trans_trace, "ekuiper send", new_span_id); neu_otel_scope_add_span_attr_int(trans_scope, "thread id", (int64_t)(pthread_self())); neu_otel_scope_set_span_start_time(trans_scope, neu_time_ms()); @@ -137,8 +132,6 @@ void send_data(neu_plugin_t *plugin, neu_reqresp_trans_data_t *trans_data) } if (trans_trace) { - neu_otel_scope_add_span_attr_string(trans_scope, "playload", - json_str); trace_id = neu_otel_get_trace_id(trans_trace); uint8_t span_id[8] = { 0 }; hex_string_to_binary(new_span_id, span_id, 8); @@ -172,7 +165,7 @@ void send_data(neu_plugin_t *plugin, neu_reqresp_trans_data_t *trans_data) } while (0); if (trans_trace) { - neu_otel_scope_add_span_attr_int(trans_scope, "error", rv); + neu_otel_scope_add_span_attr_int(trans_scope, "rv", rv); neu_otel_scope_set_span_end_time(trans_scope, neu_time_ms()); neu_otel_trace_set_final(trans_trace); } diff --git a/plugins/mqtt/mqtt_handle.c b/plugins/mqtt/mqtt_handle.c index 01b7d597a..3ab402bad 100644 --- a/plugins/mqtt/mqtt_handle.c +++ b/plugins/mqtt/mqtt_handle.c @@ -607,15 +607,10 @@ int handle_trans_data(neu_plugin_t * plugin, if (neu_otel_data_is_started() && trans_data->trace_ctx) { trans_trace = neu_otel_find_trace(trans_data->trace_ctx); if (trans_trace) { - trans_scope = neu_otel_add_span(trans_trace); - neu_otel_scope_set_span_name(trans_scope, "mqtt publish"); char new_span_id[36] = { 0 }; neu_otel_new_span_id(new_span_id); - neu_otel_scope_set_span_id(trans_scope, new_span_id); - uint8_t *p_sp_id = neu_otel_scope_get_pre_span_id(trans_scope); - if (p_sp_id) { - neu_otel_scope_set_parent_span_id2(trans_scope, p_sp_id, 8); - } + trans_scope = + neu_otel_add_span2(trans_trace, "mqtt publish", new_span_id); neu_otel_scope_add_span_attr_int(trans_scope, "thread id", (int64_t)(pthread_self())); neu_otel_scope_set_span_start_time(trans_scope, neu_time_ms()); @@ -661,15 +656,9 @@ int handle_trans_data(neu_plugin_t * plugin, neu_mqtt_qos_e qos = plugin->config.qos; if (plugin->config.version == NEU_MQTT_VERSION_V5 && trans_trace) { - neu_otel_scope_add_span_attr_string(trans_scope, "playload", - json_str); rv = publish_with_trace(plugin, qos, topic, json_str, strlen(json_str), trace_parent); } else { - if (trans_trace) { - neu_otel_scope_add_span_attr_string(trans_scope, "playload", - json_str); - } rv = publish(plugin, qos, topic, json_str, strlen(json_str)); } @@ -677,7 +666,7 @@ int handle_trans_data(neu_plugin_t * plugin, } while (0); if (trans_trace) { - neu_otel_scope_add_span_attr_int(trans_scope, "error", rv); + neu_otel_scope_add_span_attr_int(trans_scope, "rv", rv); neu_otel_scope_set_span_end_time(trans_scope, neu_time_ms()); neu_otel_trace_set_final(trans_trace); } diff --git a/plugins/mqtt/mqtt_plugin_intf.c b/plugins/mqtt/mqtt_plugin_intf.c index 2680e8bad..427bf1a6a 100644 --- a/plugins/mqtt/mqtt_plugin_intf.c +++ b/plugins/mqtt/mqtt_plugin_intf.c @@ -537,7 +537,7 @@ int mqtt_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head, } if (trace) { - neu_otel_scope_add_span_attr_int(scope, "handle error", error); + neu_otel_scope_add_span_attr_int(scope, "rv", error); neu_otel_scope_set_span_end_time(scope, neu_time_ms()); neu_otel_trace_set_final(trace); } diff --git a/src/adapter/driver/driver.c b/src/adapter/driver/driver.c index 090e475b0..d41408528 100644 --- a/src/adapter/driver/driver.c +++ b/src/adapter/driver/driver.c @@ -1987,15 +1987,10 @@ static int report_callback(void *usr_data) if (neu_otel_data_is_started() && data->trace_ctx) { trans_trace = neu_otel_find_trace(data->trace_ctx); if (trans_trace) { - trans_scope = neu_otel_add_span(trans_trace); - neu_otel_scope_set_span_name(trans_scope, "report cb"); char new_span_id[36] = { 0 }; neu_otel_new_span_id(new_span_id); - neu_otel_scope_set_span_id(trans_scope, new_span_id); - uint8_t *p_sp_id = neu_otel_scope_get_pre_span_id(trans_scope); - if (p_sp_id) { - neu_otel_scope_set_parent_span_id2(trans_scope, p_sp_id, 8); - } + trans_scope = + neu_otel_add_span2(trans_trace, "report cb", new_span_id); neu_otel_scope_add_span_attr_int(trans_scope, "thread id", (int64_t)(pthread_self())); neu_otel_scope_set_span_start_time(trans_scope, neu_time_ms()); diff --git a/src/otel/otel_manager.c b/src/otel/otel_manager.c index 15047547c..8ee6e3d2a 100644 --- a/src/otel/otel_manager.c +++ b/src/otel/otel_manager.c @@ -96,6 +96,9 @@ static int hex_char_to_int(char c) static int hex_string_to_binary(const char * hex_string, unsigned char *binary_array, int max_length) { + if (hex_string == NULL) { + return -1; + } int length = strlen(hex_string); if (length % 2 != 0 || length <= 0) return -1; @@ -601,6 +604,84 @@ neu_otel_scope_ctx neu_otel_add_span(neu_otel_trace_ctx ctx) return scope; } +neu_otel_scope_ctx neu_otel_add_span2(neu_otel_trace_ctx ctx, + const char * span_name, + const char * span_id) +{ + trace_scope_t *scope = calloc(1, sizeof(trace_scope_t)); + trace_ctx_t * trace_ctx = (trace_ctx_t *) ctx; + pthread_mutex_lock(&trace_ctx->mutex); + scope->trace_ctx = ctx; + if (trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans == 0) { + trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans = + calloc(1, sizeof(Opentelemetry__Proto__Trace__V1__Span *)); + trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans[0] = + calloc(1, sizeof(Opentelemetry__Proto__Trace__V1__Span)); + + trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans = 1; + scope->span = + trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans[0]; + scope->span_index = 0; + } else { + Opentelemetry__Proto__Trace__V1__Span **t_spans = + trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans; + trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans = + calloc(1 + + trace_ctx->trace_data.resource_spans[0] + ->scope_spans[0] + ->n_spans, + sizeof(Opentelemetry__Proto__Trace__V1__Span *)); + for (size_t i = 0; i < + trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans; + i++) { + trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans[i] = + t_spans[i]; + } + + free(t_spans); + + trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->spans + [trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans] = + calloc(1, sizeof(Opentelemetry__Proto__Trace__V1__Span)); + scope->span = trace_ctx->trace_data.resource_spans[0] + ->scope_spans[0] + ->spans[trace_ctx->trace_data.resource_spans[0] + ->scope_spans[0] + ->n_spans]; + trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans += 1; + + scope->span_index = + trace_ctx->trace_data.resource_spans[0]->scope_spans[0]->n_spans - + 1; + } + + opentelemetry__proto__trace__v1__span__init(scope->span); + scope->span->kind = + OPENTELEMETRY__PROTO__TRACE__V1__SPAN__SPAN_KIND__SPAN_KIND_SERVER; + uint8_t *t_id = calloc(1, 16); + hex_string_to_binary((char *) trace_ctx->trace_id, t_id, 16); + scope->span->trace_id.data = t_id; + scope->span->trace_id.len = 16; + scope->span->flags = trace_ctx->flags; + neu_otel_scope_set_span_name(scope, span_name); + neu_otel_scope_set_span_id(scope, span_id); + if (scope->span_index != 0) { + neu_otel_scope_set_parent_span_id2( + scope, + trace_ctx->trace_data.resource_spans[0] + ->scope_spans[0] + ->spans[scope->span_index - 1] + ->span_id.data, + trace_ctx->trace_data.resource_spans[0] + ->scope_spans[0] + ->spans[scope->span_index - 1] + ->span_id.len); + } + utarray_push_back(trace_ctx->scopes, &scope); + pthread_mutex_unlock(&trace_ctx->mutex); + return scope; +} + void neu_otel_scope_set_parent_span_id(neu_otel_scope_ctx ctx, const char * parent_span_id) {