Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema resolution slowing down encoding #117

Open
baltendo opened this issue Oct 13, 2024 · 7 comments
Open

Schema resolution slowing down encoding #117

baltendo opened this issue Oct 13, 2024 · 7 comments

Comments

@baltendo
Copy link

baltendo commented Oct 13, 2024

Describe the bug
We heavily use Kafka, Avro and the Schema Registry with Java. I wanted to implement now a service in Rust. The service is running fine but producing a message is very slow and I found the schema resolution to be the slow part. I read about the schema resolution and I wonder why it is called during encoding. As far as I understood it is needed during decoding when the schema is different than the one used during encoding.

We are using a quite big schema with many records that are used multiple times so they become named references after the first definition. Unfortunately, I cannot just attach the schema. Its already mentioned in avro-rs that this path is slow:

image

To Reproduce
Steps to reproduce the behavior:
EasyAvroEncoder.encode_struct()with a schema with many named references

Here the .resolve() method is called and I don't understand why (see comment):

pub(crate) fn item_to_bytes(
    avro_schema: &AvroSchema,
    item: impl Serialize,
) -> Result<Vec<u8>, SRCError> {
    match to_value(item)
        .map_err(|e| {
            SRCError::non_retryable_with_cause(e, "Could not transform to apache_avro value")
        })
        // not sure why schema resolution should happen on serialization/writing
        .map(|r| r.resolve(&avro_schema.parsed))
    {
        Ok(Ok(v)) => to_bytes(avro_schema, v),
        Ok(Err(e)) => Err(SRCError::non_retryable_with_cause::<SRCError>(e, "Failed to resolve")),
        Err(e) => Err(e),
    }
}

I tried to write a test. The child struct could be duplicated to get more named references:

    #[test]
    fn named() {
        #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
        pub struct Parent {
            #[serde(rename = "child1")]
            pub child1: Option<Child>,
            #[serde(rename = "child2")]
            pub child2: Option<Child>,
            #[serde(rename = "child3")]
            pub child3: Option<Child>,
        }

        #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
        pub struct Child {
            #[serde(rename = "name")]
            pub name: Option<String>,
        }

        let writer_schema = r#"{
  "type": "record",
  "name": "Parent",
  "fields": [
    {
      "name": "child1",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Child",
          "fields": [
            {
              "name": "name",
              "type": [
                "null",
                "string"
              ],
              "default": null
            }
          ]
        }
      ]
    },
    {
      "name": "child2",
      "type": [
        "null",
        "Child"
      ],
      "default": null
    },
    {
      "name": "child3",
      "type": [
        "null",
        "Child"
      ],
      "default": null
    }
  ]
}"#;

        let schema = AvroSchema {
            id: 6,
            raw: String::from(writer_schema),
            parsed: Schema::parse_str(writer_schema).unwrap(),
        };

        let item = Parent {
            child1: Some(Child { name: Some("child1".to_string()) }),
            child2: Some(Child { name: Some("child2".to_string()) }),
            child3: Some(Child { name: None }),
        };

        let now = Instant::now();
        let result = crate::avro_common::item_to_bytes(&schema, item);
        let elapsed = now.elapsed();
        println!("writing took: {:.2?}", elapsed);
        let bytes = result.unwrap();

        assert_eq!(bytes.len(), 25);
    }

Here is a screenshot of the running service from the IDE with some additional logs "Sending" and "Sent" around the EasyAvroEncoder.encode_struct() plus .await().

image

Expected behavior
I expect it to be faster. When I remove all the data related to named references (because I have many nullable fields so its possible) then it is much faster. The following screenshot shows first sending of a big event with many named references and then a small event with no named references:

image

Options

  • is it possible to just avoid the schema resolution here?
  • is it possible to check if the writer and reader schema is different, to only do schema resolution when needed``
  • is it possible to speedup the schema resolution (but this is a change in avro-rs)?
@gklijs
Copy link
Owner

gklijs commented Oct 14, 2024

Can you ensure it's not just the first one that is slow? Next ones should be a lot faster because of the cache?
I think all three options are out of scope for this library. As I understand correctly, you want to bypass schema registry for the first two, in which case this library is just overhead, and it's much better to use avro-rs directly. Or seeding up, which is the third point. Maybe you can make a flame graph, see where it's slow, and there might be a quick win?

@martin-g
Copy link

martin-g commented Oct 14, 2024

Hi,

Yesterday I read https://medium.com/@saiharshavellanki/building-a-blazing-fast-kafka-ingestion-pipeline-in-rust-with-protobuf-1cdc2f768c5f by @saiharshavellanki . It says:

Sync vs. Async Decoding: By adding metrics, we identified the main bottleneck in the decoding process. The sync
 module of the schema_registry_converter crate doesn’t cache schemas, leading to performance issues. 
Switching to the async module, which includes built-in caching, resolved this and significantly improved throughput.

Could that be the issue ?

@gklijs
Copy link
Owner

gklijs commented Oct 14, 2024

Interesting, the sync module should also do some caching, but it might by before actual parsing the schema.

@baltendo
Copy link
Author

Yes, I can ensure that the problem stays and I didn't consider the first event in the screenshots.

@baltendo
Copy link
Author

I use the EasyAvroDecoder so I am using already the async implementation where the article claims that it is faster.

@saiharshavellanki
Copy link

@baltendo the article mentions about protobuf decoder, not the AvroDecoder

@gklijs
Copy link
Owner

gklijs commented Oct 16, 2024

It's been a while I wrote the code. I think there could be an additional option to bypass resolving. Which can be used if you are sure the schema you are using for producing the data is exactly the same as the schema in schema registry. Does that sound like somethign that makes sense and would give additional performance?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants