Skip to content

Instantly share code, notes, and snippets.

@dalelane
Last active September 20, 2023 02:50
Show Gist options
  • Save dalelane/3931c17b14c51fa4a1cf25496237d188 to your computer and use it in GitHub Desktop.
Save dalelane/3931c17b14c51fa4a1cf25496237d188 to your computer and use it in GitHub Desktop.
Describing Kafka schema usage using AsyncAPI

Describing Kafka schema usage using AsyncAPI

I've picked a dozen scenarios to illustrate the range of things that I think AsyncAPI needs to be able to describe. My goal is to have enough information in the AsyncAPI spec for a developer writing an app to consume messages from the topic.

Updated following feedback in Slack and Github - see revisions to see what has changed

also:


Binary-encoded Avro

Scenario

Messages on a Kafka topic are being produced using a GenericDatumWriter with binary encoding.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            key:
              type: string
        payload:
          $ref: "my-avro-schema.avsc"

or (optionally, explicitly specifying that a schema registry was not used)

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "none"
            key:
              type: string
        payload:
          $ref: "my-avro-schema.avsc"

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

JSON-encoded Avro

Scenario

Messages on a Kafka topic were produced using a GenericDatumWriter with JSON encoding.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, explicitly specifying that a schema registry was not used)

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            schemaIdLocation: "none"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Confluent using schema registry

Scenario

Messages on a Kafka topic were produced using a Confluent serdes library.

The Confluent schema registry is available for use by consuming applications.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "confluent"
        schemaRegistryAvailable: true
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "payload"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");

Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my.topic.name"));

ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
}

Confluent

Scenario

Messages on a Kafka topic were produced using a Confluent serdes library.

The Confluent schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "payload"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "confluent"
        schemaRegistryAvailable: false
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "payload"
          key:
            type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);

    // skip the first five bytes that contain the "magic" byte and 4 byte global ID
    bias.skip(5);

    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Apicurio using binary encoding

Scenario

Messages on a Kafka topic were produced using an Apicurio serdes library configured to use binary encoding.

The Apicurio schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "apicurio"
        schemaRegistryAvailable: false
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Apicurio using binary encoding using schema registry

Scenario

Messages on a Kafka topic were produced using an Apicurio serdes library configured to use binary encoding.

The Apicurio schema registry is available for use by consuming applications.

The AvroKafkaDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "apicurio"
        schemaRegistryAvailable: true
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        traits:
          - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml
        headers:
          type: object
          properties:
            apicurio.value.encoding:
              type: string
              enum:
              - "BINARY"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
}

Apicurio using JSON encoding

Scenario

Messages on a Kafka topic were produced using an Apicurio serdes library configured to use JSON encoding.

The Apicurio schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "apicurio"
        schemaRegistryAvailable: false
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Apicurio using JSON encoding using schema registry

Scenario

Messages on a Kafka topic were produced using an Apicurio serdes library configured to use JSON encoding.

The Apicurio schema registry is available for use by consuming applications.

The AvroKafkaDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "apicurio"
        schemaRegistryAvailable: true
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        traits:
          - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml
        headers:
          type: object
          properties:
            apicurio.value.encoding:
              type: string
              enum:
              - "JSON"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
}

Event Streams using binary encoding

Scenario

Messages on a Kafka topic were produced using an Event Streams serdes library configured to use binary encoding.

The Event Streams schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "ibm"
        schemaRegistryAvailable: false
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Event Streams using binary encoding using schema registry

Scenario

Messages on a Kafka topic were produced using an Event Streams serdes library configured to use binary encoding.

The Event Streams schema registry is available for use by consuming applications.

The EventStreamsDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "ibm"
        schemaRegistryAvailable: true
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/octet-stream"
        traits:
          - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml
        headers:
          type: object
          properties:
            com.ibm.eventstreams.schemaregistry.encoding:
              type: string
              enum:
              - "BINARY"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.ibm.eventstreams.serdes.EventStreamsDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
}

Event Streams using JSON encoding

Scenario

Messages on a Kafka topic were produced using an Event Streams serdes library configured to use JSON encoding.

The Event Streams schema registry is not available for use by consuming applications, which should be implemented based on the AsyncAPI spec alone.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

or (optionally, describing the Schema Registry that was used to produce messages for information, even if it is not available for use by consumers)

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "ibm"
        schemaRegistryAvailable: false
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File("my-avro-schema.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
    byte[] bytes = record.value();

    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    Decoder decoder = DecoderFactory.get().jsonDecoder(bais, null);
    GenericRecord record = reader.read(null, decoder);

    String somefield = record.get("field-from-the-schema");
}

Event Streams using JSON encoding using schema registry

Scenario

Messages on a Kafka topic were produced using an Event Streams serdes library configured to use JSON encoding.

The Event Streams schema registry is available for use by consuming applications.

The EventStreamsDeserializer class knows what headers contain the schema information needed to deserialize messages, so in some ways it is redundant to specify this in the AsyncAPI spec. I've included it because some developers will write applications in programming languages where a supported serdes class isn't available, so fully describing the contents of the topic still seems worthwhile.

How this could be described in AsyncAPI

asyncapi: "2.0.0"
...
servers:
  prod:
    ...
    bindings:
      kafka:
        schemaRegistryUrl: "https://my-schema-registry.com"
        schemaRegistryVendor: "ibm"
        schemaRegistryAvailable: true
...
channels:
  my.topic.name:
    ...
    subscribe:
      ...
      message:
        schemaFormat: "application/vnd.apache.avro;version=1.9.0"
        contentType: "application/json"
        traits:
          - $ref: https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml
        headers:
          type: object
          properties:
            com.ibm.eventstreams.schemaregistry.encoding:
              type: string
              enum:
              - "JSON"
        bindings:
          kafka:
            schemaIdLocation: "header"
            key:
              type: string
        payload:
          $ref: my-avro-schema.avsc

How a Java developer could consume messages from this spec

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.ibm.eventstreams.serdes.EventStreamsDeserializer");
props.put("schema.registry.url", "https://my-schema-registry.com");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my.topic.name"));

ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, GenericRecord> record : records) {
    String somefield = record.get("field-from-your-schema");
}

Message traits

Some common definitions of message headers based on well-known serdes libraries

Apicurio

e.g. https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/apicurio-v0.0.1.yaml

name: apicurio
summary: Message headers used by Apicurio serdes library
headers:
  type: object
  properties:
    apicurio.value.globalId:
      type: string
    apicurio.value.version:
      type: integer
    apicurio.value.encoding:
      type: string
      enum:
      - "BINARY"
      - "JSON"

IBM Event Streams

e.g. https://github.com/asyncapi/bindings/raw/master/kafka/examples/traits/ibm-v0.0.1.yaml

name: ibm-eventstreams
summary: Message headers used by IBM Event Streams serdes library
headers:
  type: object
  properties:
    com.ibm.eventstreams.schemaregistry.schema.id:
      type: string
    com.ibm.eventstreams.schemaregistry.schema.version:
      type: integer
    com.ibm.eventstreams.schemaregistry.encoding:
      type: string
      enum:
      - "BINARY"
      - "JSON"

@M3lkior
Copy link

M3lkior commented Sep 3, 2021

Hey, very nice proposal, Dale,

I think that the binding approach on server to document schema registry is pertinent, but I see some limitation

  • About the security part : How to document the way to connect to a restricted schema registry ? Your proposal make sense if the Schema registry security is the same that the attached server but what about different creds ?
  • How to deal with additional properties if needed ?

Maybe we can describe the schema registry part in a dedicated server bloc with the http protocol like :

servers:
  dev:
    url: "https://my-schema-registry.com"
    description: My schema registry description
    protocol: https
    protocolVersion: '2.0'

    security:
      - creds: []
components:
 securitySchemes:
   creds:
      type: userPassword
      description: |
        Provide your Service Account API key / secret.

But we have a constraint with the unique server key: we can not have a dev key for the kafka cluster and a devkey for the registry

Any feedback ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment