Custom deserializers
To create a custom deserializer, you need to create a class that implements the \Junges\Kafka\Contracts\MessageDeserializer
contract.
This interface force you to declare the deserialize
method.
Support Laravel Kafka by sponsoring me!
Do you find Laravel Kafka valuable and wanna support its development?
Laravel Kafka is free and Open Source software, built to empower developers like you. Your support helps maintain and enhance the project. If you find it valuable, please consider sponsoring me on GitHub. Every contribution makes a difference and keeps the development going strong! Thank you!
To set the deserializer you want to use, use the usingDeserializer
method:
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->usingDeserializer(new MyCustomDeserializer());
Hot tip!
Using AVRO deserializer
To use the AVRO deserializer on your consumer, add the Avro deserializer:
use FlixTech\AvroSerializer\Objects\RecordSerializer; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use GuzzleHttp\Client; $cachedRegistry = new CachedRegistry( new BlockingRegistry( new PromisingRegistry( new Client(['base_uri' => 'kafka-schema-registry:9081']) ) ), new AvroObjectCacheAdapter() ); $registry = new \Junges\Kafka\Message\Registry\AvroSchemaRegistry($cachedRegistry); $recordSerializer = new RecordSerializer($cachedRegistry); //if no version is defined, latest version will be used //if no schema definition is defined, the appropriate version will be fetched form the registry $registry->addBodySchemaMappingForTopic( 'test-topic', new \Junges\Kafka\Message\KafkaAvroSchema('bodySchema' , 9 /* , AvroSchema $definition */) ); $registry->addKeySchemaMappingForTopic( 'test-topic', new \Junges\Kafka\Message\KafkaAvroSchema('keySchema' , 9 /* , AvroSchema $definition */) ); // if you are only decoding key or value, you can pass that mode as additional third argument // per default both key and body will get decoded $deserializer = new \Junges\Kafka\Message\Deserializers\AvroDeserializer($registry, $recordSerializer /*, AvroDecoderInterface::DECODE_BODY */); $consumer = \Junges\Kafka\Facades\Kafka::consumer()->usingDeserializer($deserializer);