Schema References is a feature introduced in Confluent Platform 5.5 that
allows to mix different types, in the same topic, and still being able to
enforce types and data type validation using a TopicNameStrategy
This sample repository contains the example, source code and configuration described in this great post by @rayokota.
docker compose up -d
mvn schema-registry:register
From the output, capture the subject ID for all-types-value
. You will need that value for the producer application. In the example execution below, the value is 3. Note that you can also get that ID executing curl -XGET http://localhost:8081/subjects/all-types-value/versions/1
curl -XGET http://localhost:8081/subjects | jq
curl -XGET http://localhost:8081/subjects/product/versions/1 | jq
"subject": "product",
"version": 1,
"id": 2,
"schema": "{\"type\":\"record\",\"name\":\"Product\",\"namespace\":\"org.matias\",\"fields\":[{\"name\":\"product_id\",\"type\":\"int\"},{\"name\":\"product_name\",\"type\":\"string\"},{\"name\":\"product_price\",\"type\":\"double\"}]}"
curl -XGET http://localhost:8081/subjects/customer/versions/1 | jq
"subject": "customer",
"version": 1,
"id": 1,
"schema": "{\"type\":\"record\",\"name\":\"Customer\",\"namespace\":\"org.matias\",\"fields\":[{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"customer_name\",\"type\":\"string\"},{\"name\":\"customer_email\",\"type\":\"string\"},{\"name\":\"customer_address\",\"type\":\"string\"}]}"
curl -XGET http://localhost:8081/subjects/all-types-value/versions/1 | jq
"subject": "all-types-value",
"version": 1,
"id": 3,
"references": [
"name": "io.confluent.examples.avro.Customer",
"subject": "customer",
"version": 1
"name": "io.confluent.examples.avro.Product",
"subject": "product",
"version": 1
"schema": "[\"org.matias.Customer\",\"org.matias.Product\"]"
docker compose exec schema-registry kafka-avro-console-consumer \
--bootstrap-server broker:9092 \
--topic all-types \
# <top-level-id> is 3 in this example
docker compose exec schema-registry kafka-avro-console-producer \
--bootstrap-server broker:9092 \
--topic all-types \
--property<top-level-id> \
--property auto.register=false \
--property use.latest.version=true
{ "org.matias.Product": { "product_id": 1, "product_name" : "rice", "product_price" : 100.00 } }
{ "org.matias.Customer": { "customer_id": 100, "customer_name": "acme", "customer_email": "[email protected]", "customer_address": "1 Main St" } }
You can also try to add a non existent json and the producer will fail:
{ "org.matias.NonExistent": { "field" : 10} }
org.apache.kafka.common.errors.SerializationException: Error deserializing json { "org.matias.NonExistent": { "field" : 10} } to Avro of schema [{"type":"record","name":"Customer","namespace":"org.matias","fields":[{"name":"customer_id","type":"int"},{"name":"customer_name","type":"string"},{"name":"customer_email","type":"string"},{"name":"customer_address","type":"string"}]},{"type":"record","name":"Product","namespace":"org.matias","fields":[{"name":"product_id","type":"int"},{"name":"product_name","type":"string"},{"name":"product_price","type":"double"}]}]
Caused by: org.apache.avro.AvroTypeException: Unknown union branch org.matias.NonExistent
Based on this demo.
- Create the topic
docker compose exec broker kafka-topics --bootstrap-server broker:9092 --create --partitions 1 --replication-factor 1 --topic test-schemas
- Start a producer (using default serializer)
docker compose exec broker kafka-console-producer --broker-list broker:9092 --topic test-schemas --property parse.key=true --property key.separator=,
- Produce something as example (
is the key,my first record
is the value, no schemas are reinforced)
>1,my first record
- In another shell, create a consumer (you should see the event produced above)
docker compose exec broker kafka-console-consumer --bootstrap-server broker:9092 --from-beginning --topic test-schemas --property print.key=true
- Let's enable the validation (you should see
Completed updating config for topic test-schemas.
docker compose exec broker kafka-configs --bootstrap-server broker:9092 --alter --entity-type topics --entity-name test-schemas --add-config confluent.value.schema.validation=true
- Add a new record in the producer started on step 2.
>2,my second record
You will see an exception returned:
>[...] ERROR Error when sending message to topic test-schemas with key: 1 bytes, value: 16 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.InvalidRecordException: Log record DefaultRecord(offset=0, timestamp=1671010385783, key=1 bytes, value=16 bytes) is rejected by the record interceptor io.confluent.kafka.schemaregistry.validator.RecordSchemaValidator
That happens because Schema Validation is enabled and the messages we are sending do not contain schema IDs: This record has failed the validation on broker
- Let's now disable the validation (you should see
Completed updating config for topic test-schemas.
docker compose exec broker kafka-configs --bootstrap-server broker:9092 --alter --entity-type topics --entity-name test-schemas --add-config confluent.value.schema.validation=false
- And add a third record in the producer started on step 2.
>3,the third record
This one should be accepted as there is no validation on the broker side.
- Stop the consumer/producer with a Ctrl + C
- Stop docker containers
docker compose down -v