Custom serializers | Laravel Kafka

 [ Laravel Kafka ](/)

     Search

 ⌘K

  [   Login with GitHub ](https://laravelkafka.com/oauth/github/redirect)

    Docs for version          selected

 v2.11

 v2.10

 v2.9

 v2.8

 v1.13

- - [ Introduction ](/docs/v2.9/introduction)
    - [ Requirements ](/docs/v2.9/requirements)
    - [ Installation and Setup ](/docs/v2.9/installation-and-setup)
    - [ Questions and issues ](/docs/v2.9/questions-and-issues)
    - [ Changelog ](/docs/v2.9/changelog)
    - [ Upgrade guide ](/docs/v2.9/upgrade-guide)
    - [ Example docker-compose file ](/docs/v2.9/example-docker-compose)
- Producing messages
    ------------------

    - [ Producing messages ](/docs/v2.9/producing-messages/producing-messages)
    - [ Configuring your kafka producer ](/docs/v2.9/producing-messages/configuring-producers)
    - [ Configuring message payload ](/docs/v2.9/producing-messages/configuring-message-payload)
    - [ Custom serializers ](/docs/v2.9/producing-messages/custom-serializers)
    - [ Publishing to kafka ](/docs/v2.9/producing-messages/publishing-to-kafka)
- Consuming messages
    ------------------

    - [ Creating a kafka consumer ](/docs/v2.9/consuming-messages/creating-consumer)
    - [ Subscribing to kafka topics ](/docs/v2.9/consuming-messages/subscribing-to-kafka-topics)
    - [ Using regex to subscribe to kafka topics ](/docs/v2.9/consuming-messages/using-regex-to-subscribe-to-kafka-topics)
    - [ Assigning consumers to a topic partition ](/docs/v2.9/consuming-messages/assigning-partitions)
    - [ Consuming messages from specific offsets ](/docs/v2.9/consuming-messages/consuming-from-specific-offsets)
    - [ Consumer groups ](/docs/v2.9/consuming-messages/consumer-groups)
    - [ Partition Discovery and Dynamic Assignment ](/docs/v2.9/consuming-messages/partition-discovery)
    - [ Message handlers ](/docs/v2.9/consuming-messages/message-handlers)
    - [ Configuring consumer options ](/docs/v2.9/consuming-messages/configuring-consumer-options)
    - [ Custom deserializers ](/docs/v2.9/consuming-messages/custom-deserializers)
    - [ Consuming messages ](/docs/v2.9/consuming-messages/consuming-messages)
    - [ Class structure ](/docs/v2.9/consuming-messages/class-structure)
    - [ Queueable handlers ](/docs/v2.9/consuming-messages/queueable-handlers)
- Advanced usage
    --------------

    - [ Replacing the default serializer/deserializer ](/docs/v2.9/advanced-usage/replacing-default-serializer)
    - [ Graceful shutdown ](/docs/v2.9/advanced-usage/graceful-shutdown)
    - [ SASL Authentication ](/docs/v2.9/advanced-usage/sasl-authentication)
    - [ Custom Committers ](/docs/v2.9/advanced-usage/custom-committers)
    - [ Manual Commit ](/docs/v2.9/advanced-usage/manual-commit)
    - [ Middlewares ](/docs/v2.9/advanced-usage/middlewares)
    - [ Stop consumer after last messages ](/docs/v2.9/advanced-usage/stop-consumer-after-last-message)
    - [ Stop consumer on demand ](/docs/v2.9/advanced-usage/stopping-a-consumer)
    - [ Writing custom loggers ](/docs/v2.9/advanced-usage/custom-loggers)
    - [ Before and after callbacks ](/docs/v2.9/advanced-usage/before-callbacks)
    - [ Setting global configurations ](/docs/v2.9/advanced-usage/setting-global-configuration)
    - [ Sending multiple messages with the same producer ](/docs/v2.9/advanced-usage/sending-multiple-messages-with-the-same-producer)
- Testing
    -------

    - [ Kafka fake ](/docs/v2.9/testing/fake)
    - [ Assert Published ](/docs/v2.9/testing/assert-published)
    - [ Assert published On ](/docs/v2.9/testing/assert-published-on)
    - [ Assert nothing published ](/docs/v2.9/testing/assert-nothing-published)
    - [ Assert published times ](/docs/v2.9/testing/assert-published-times)
    - [ Assert published on times ](/docs/v2.9/testing/assert-published-on-times)
    - [ Mocking your kafka consumer ](/docs/v2.9/testing/mocking-your-kafka-consumer)

  Custom serializers
====================

Serialization is the process of converting messages to bytes. Deserialization is the inverse process - converting a stream of bytes into and object. In a nutshell, it transforms the content into readable and interpretable information.

Support Laravel Kafka by sponsoring me!

Do you find Laravel Kafka valuable and wanna support its development?

Laravel Kafka is free and Open Source software, built to empower developers like you. Your support helps maintain and enhance the project. If you find it valuable, please consider sponsoring me on GitHub. Every contribution makes a difference and keeps the development going strong! Thank you!

   [ Become a Sponsor ](https://github.com/sponsors/mateusjunges)

 Want to hide this message? Sponsor at any tier of $10/month or more!

Basically, in order to prepare the message for transmission from the producer we use serializers. This package supports three serializers out of the box:

- NullSerializer / NullDeserializer
- JsonSerializer / JsonDeserializer
- AvroSerializer / JsonDeserializer

If the default `JsonSerializer` does not fulfill your needs, you can make use of custom serializers.

To create a custom serializer, you need to create a class that implements the `\Junges\Kafka\Contracts\MessageSerializer` contract. This interface force you to declare the serialize method.

You can inform your producer which serializer should be used with the `usingSerializer` method:

         ```
$producer = \Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic')->usingSerializer(new MyCustomSerializer());
```

To create a custom serializer, you need to create a class that implements the `\Junges\Kafka\Contracts\MessageSerializer` contract. This interface force you to declare the `serialize` method.

### [](#content-using-avro-serializer "Permalink")Using AVRO serializer

To use the AVRO serializer, add the AVRO serializer:

         ```
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;

$cachedRegistry = new CachedRegistry(
    new BlockingRegistry(
        new PromisingRegistry(
            new Client(['base_uri' => 'kafka-schema-registry:9081'])
        )
    ),
    new AvroObjectCacheAdapter()
);

$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);

//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
$registry->addBodySchemaMappingForTopic(
    'test-topic',
    new \Junges\Kafka\Message\KafkaAvroSchema('bodySchemaName' /*, int $version, AvroSchema $definition */)
);
$registry->addKeySchemaMappingForTopic(
    'test-topic',
    new \Junges\Kafka\Message\KafkaAvroSchema('keySchemaName' /*, int $version, AvroSchema $definition */)
);

$serializer = new \Junges\Kafka\Message\Serializers\AvroSerializer($registry, $recordSerializer /*, AvroEncoderInterface::ENCODE_BODY */);

$producer = \Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic')->usingSerializer($serializer);
```

 Previous  [ Configuring message payload    ](https://laravelkafka.com/docs/v2.9/producing-messages/configuring-message-payload)

 Next  [ Publishing to kafka    ](https://laravelkafka.com/docs/v2.9/producing-messages/publishing-to-kafka)

Sponsors

 [ version="1.0" encoding="UTF-8"?       EasyCal ](https://easycal.app/)

 [       Search  ⌘ K   ](https://typesense.org/)
