I've picked a dozen scenarios to illustrate the range of things that I think AsyncAPI needs to be able to describe. My goal is to have enough information in the AsyncAPI spec for a developer writing an app to consume messages from the topic.
Updated following feedback in Slack and Github - see revisions to see what has changed
- Binary-encoded Avro
- JSON-encoded Avro
- Confluent using schema registry
- Confluent
- Apicurio using binary encoding
- Apicurio using binary encoding using schema registry
- Apicurio using JSON encoding
- Apicurio using JSON encoding using schema registry
- Event Streams using binary encoding
- Event Streams using binary encoding using schema registry
- Event Streams using JSON encoding
- Event Streams using JSON encoding using schema registry
also:
Messages on a Kafka topic are being produced using a GenericDatumWriter with binary encoding.
asyncapi: "2.0.0"
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/octet-stream"
bindings:
kafka:
key:
type: string
payload:
$ref: "my-avro-schema.avsc"
or (optionally, explicitly specifying that a schema registry was not used)
asyncapi: "2.0.0"
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/octet-stream"
bindings:
kafka:
schemaIdLocation: "none"
key:
type: string
payload:
$ref: "my-avro-schema.avsc"
Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));
ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
byte[] bytes = record.value();
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
GenericRecord record = reader.read(null, decoder);
String somefield = record.get("field-from-the-schema");
}
Messages on a Kafka topic were produced using a GenericDatumWriter with JSON encoding.
asyncapi: "2.0.0"
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/json"
bindings:
kafka:
key:
type: string
payload:
$ref: my-avro-schema.avsc
or (optionally, explicitly specifying that a schema registry was not used)
asyncapi: "2.0.0"
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/json"
bindings:
kafka:
schemaIdLocation: "none"
key:
type: string
payload:
$ref: my-avro-schema.avsc
Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));
ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
byte[] bytes = record.value();
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
GenericRecord record = reader.read(null, decoder);
String somefield = record.get("field-from-the-schema");
}
Messages on a Kafka topic were produced using a Confluent serdes library.
The Confluent schema registry is available for use by consuming applications.
asyncapi: "2.0.0"
...
servers:
prod:
...
bindings:
kafka:
schemaRegistryUrl: "https://my-schema-registry.com"
schemaRegistryVendor: "confluent"
schemaRegistryAvailable: true
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/octet-stream"
bindings:
kafka:
schemaIdLocation: "payload"
key:
type: string
payload:
$ref: my-avro-schema.avsc
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");
Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my.topic.name"));
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records) {
String somefield = record.get("field-from-your-schema");
}
Messages on a Kafka topic were produced using a Confluent serdes library.
The Confluent schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.
asyncapi: "2.0.0"
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/octet-stream"
bindings:
kafka:
schemaIdLocation: "payload"
key:
type: string
payload:
$ref: my-avro-schema.avsc
or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
asyncapi: "2.0.0"
...
servers:
prod:
...
bindings:
kafka:
schemaRegistryUrl: "https://my-schema-registry.com"
schemaRegistryVendor: "confluent"
schemaRegistryAvailable: false
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/octet-stream"
bindings:
kafka:
schemaIdLocation: "payload"
key:
type: string
payload:
$ref: my-avro-schema.avsc
Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));
ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
byte[] bytes = record.value();
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
// skip the first five bytes that contain the "magic" byte and 4 byte global ID
bias.skip(5);
Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
GenericRecord record = reader.read(null, decoder);
String somefield = record.get("field-from-the-schema");
}
Messages on a Kafka topic were produced using an Apicurio serdes library configured to use binary encoding.
The Apicurio schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.
asyncapi: "2.0.0"
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/octet-stream"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
asyncapi: "2.0.0"
...
servers:
prod:
...
bindings:
kafka:
schemaRegistryUrl: "https://my-schema-registry.com"
schemaRegistryVendor: "apicurio"
schemaRegistryAvailable: false
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/octet-stream"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));
ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
byte[] bytes = record.value();
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
GenericRecord record = reader.read(null, decoder);
String somefield = record.get("field-from-the-schema");
}
Messages on a Kafka topic were produced using an Apicurio serdes library configured to use binary encoding.
The Apicurio schema registry is available for use by consuming applications.
The AvroKafkaDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.
asyncapi: "2.0.0"
...
servers:
prod:
...
bindings:
kafka:
schemaRegistryUrl: "https://my-schema-registry.com"
schemaRegistryVendor: "apicurio"
schemaRegistryAvailable: true
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/octet-stream"
traits:
- $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml
headers:
type: object
properties:
apicurio.value.encoding:
type: string
enum:
- "BINARY"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
String somefield = record.get("field-from-your-schema");
}
Messages on a Kafka topic were produced using an Apicurio serdes library configured to use JSON encoding.
The Apicurio schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.
asyncapi: "2.0.0"
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/json"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
asyncapi: "2.0.0"
...
servers:
prod:
...
bindings:
kafka:
schemaRegistryUrl: "https://my-schema-registry.com"
schemaRegistryVendor: "apicurio"
schemaRegistryAvailable: false
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/json"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));
ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
byte[] bytes = record.value();
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
GenericRecord record = reader.read(null, decoder);
String somefield = record.get("field-from-the-schema");
}
Messages on a Kafka topic were produced using an Apicurio serdes library configured to use JSON encoding.
The Apicurio schema registry is available for use by consuming applications.
The AvroKafkaDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.
asyncapi: "2.0.0"
...
servers:
prod:
...
bindings:
kafka:
schemaRegistryUrl: "https://my-schema-registry.com"
schemaRegistryVendor: "apicurio"
schemaRegistryAvailable: true
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/json"
traits:
- $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml
headers:
type: object
properties:
apicurio.value.encoding:
type: string
enum:
- "JSON"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
String somefield = record.get("field-from-your-schema");
}
Messages on a Kafka topic were produced using an Event Streams serdes library configured to use binary encoding.
The Event Streams schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.
asyncapi: "2.0.0"
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/octet-stream"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
asyncapi: "2.0.0"
...
servers:
prod:
...
bindings:
kafka:
schemaRegistryUrl: "https://my-schema-registry.com"
schemaRegistryVendor: "ibm"
schemaRegistryAvailable: false
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/octet-stream"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));
ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
byte[] bytes = record.value();
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
GenericRecord record = reader.read(null, decoder);
String somefield = record.get("field-from-the-schema");
}
Messages on a Kafka topic were produced using an Event Streams serdes library configured to use binary encoding.
The Event Streams schema registry is available for use by consuming applications.
The EventStreamsDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.
asyncapi: "2.0.0"
...
servers:
prod:
...
bindings:
kafka:
schemaRegistryUrl: "https://my-schema-registry.com"
schemaRegistryVendor: "ibm"
schemaRegistryAvailable: true
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/octet-stream"
traits:
- $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml
headers:
type: object
properties:
com.ibm.eventstreams.schemaregistry.encoding:
type: string
enum:
- "BINARY"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.ibm.eventstreams.serdes.EventStreamsDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
String somefield = record.get("field-from-your-schema");
}
Messages on a Kafka topic were produced using an Event Streams serdes library configured to use JSON encoding.
The Event Streams schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.
asyncapi: "2.0.0"
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/json"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)
asyncapi: "2.0.0"
...
servers:
prod:
...
bindings:
kafka:
schemaRegistryUrl: "https://my-schema-registry.com"
schemaRegistryVendor: "ibm"
schemaRegistryAvailable: false
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/json"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));
ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
byte[] bytes = record.value();
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
GenericRecord record = reader.read(null, decoder);
String somefield = record.get("field-from-the-schema");
}
Messages on a Kafka topic were produced using an Event Streams serdes library configured to use JSON encoding.
The Event Streams schema registry is available for use by consuming applications.
The EventStreamsDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.
asyncapi: "2.0.0"
...
servers:
prod:
...
bindings:
kafka:
schemaRegistryUrl: "https://my-schema-registry.com"
schemaRegistryVendor: "ibm"
schemaRegistryAvailable: true
...
channels:
my.topic.name:
...
subscribe:
...
message:
schemaFormat: "application/vnd.apache.avro;version=1.9.0"
contentType: "application/json"
traits:
- $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml
headers:
type: object
properties:
com.ibm.eventstreams.schemaregistry.encoding:
type: string
enum:
- "JSON"
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string
payload:
$ref: my-avro-schema.avsc
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.ibm.eventstreams.serdes.EventStreamsDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
String somefield = record.get("field-from-your-schema");
}
Some common definitions of message headers based on well-known serdes libraries
e.g. https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml
name: apicurio
summary: Message headers used by Apicurio serdes library
headers:
type: object
properties:
apicurio.value.globalId:
type: string
apicurio.value.version:
type: integer
apicurio.value.encoding:
type: string
enum:
- "BINARY"
- "JSON"
e.g. https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml
name: ibm-eventstreams
summary: Message headers used by IBM Event Streams serdes library
headers:
type: object
properties:
com.ibm.eventstreams.schemaregistry.schema.id:
type: string
com.ibm.eventstreams.schemaregistry.schema.version:
type: integer
com.ibm.eventstreams.schemaregistry.encoding:
type: string
enum:
- "BINARY"
- "JSON"
Hey, very nice proposal, Dale,
I think that the
binding
approach onserver
to document schema registry is pertinent, but I see some limitationsecurity
part : How to document the way to connect to a restricted schema registry ? Your proposal make sense if the Schema registrysecurity
is the same that the attachedserver
but what about different creds ?Maybe we can describe the schema registry part in a dedicated
server
bloc with thehttp
protocol like :But we have a constraint with the unique server key: we can not have a
dev
key for the kafka cluster and adev
key for the registryAny feedback ?