Custom Committers | Laravel Kafka

 [ Laravel Kafka ](/)

     Search

 ⌘K

  [   Login with GitHub ](https://laravelkafka.com/oauth/github/redirect)

    Docs for version          selected

 v2.11

 v2.10

 v2.9

 v2.8

 v1.13

- - [ Introduction ](/docs/v2.9/introduction)
    - [ Requirements ](/docs/v2.9/requirements)
    - [ Installation and Setup ](/docs/v2.9/installation-and-setup)
    - [ Questions and issues ](/docs/v2.9/questions-and-issues)
    - [ Changelog ](/docs/v2.9/changelog)
    - [ Upgrade guide ](/docs/v2.9/upgrade-guide)
    - [ Example docker-compose file ](/docs/v2.9/example-docker-compose)
- Producing messages
    ------------------

    - [ Producing messages ](/docs/v2.9/producing-messages/producing-messages)
    - [ Configuring your kafka producer ](/docs/v2.9/producing-messages/configuring-producers)
    - [ Configuring message payload ](/docs/v2.9/producing-messages/configuring-message-payload)
    - [ Custom serializers ](/docs/v2.9/producing-messages/custom-serializers)
    - [ Publishing to kafka ](/docs/v2.9/producing-messages/publishing-to-kafka)
- Consuming messages
    ------------------

    - [ Creating a kafka consumer ](/docs/v2.9/consuming-messages/creating-consumer)
    - [ Subscribing to kafka topics ](/docs/v2.9/consuming-messages/subscribing-to-kafka-topics)
    - [ Using regex to subscribe to kafka topics ](/docs/v2.9/consuming-messages/using-regex-to-subscribe-to-kafka-topics)
    - [ Assigning consumers to a topic partition ](/docs/v2.9/consuming-messages/assigning-partitions)
    - [ Consuming messages from specific offsets ](/docs/v2.9/consuming-messages/consuming-from-specific-offsets)
    - [ Consumer groups ](/docs/v2.9/consuming-messages/consumer-groups)
    - [ Partition Discovery and Dynamic Assignment ](/docs/v2.9/consuming-messages/partition-discovery)
    - [ Message handlers ](/docs/v2.9/consuming-messages/message-handlers)
    - [ Configuring consumer options ](/docs/v2.9/consuming-messages/configuring-consumer-options)
    - [ Custom deserializers ](/docs/v2.9/consuming-messages/custom-deserializers)
    - [ Consuming messages ](/docs/v2.9/consuming-messages/consuming-messages)
    - [ Class structure ](/docs/v2.9/consuming-messages/class-structure)
    - [ Queueable handlers ](/docs/v2.9/consuming-messages/queueable-handlers)
- Advanced usage
    --------------

    - [ Replacing the default serializer/deserializer ](/docs/v2.9/advanced-usage/replacing-default-serializer)
    - [ Graceful shutdown ](/docs/v2.9/advanced-usage/graceful-shutdown)
    - [ SASL Authentication ](/docs/v2.9/advanced-usage/sasl-authentication)
    - [ Custom Committers ](/docs/v2.9/advanced-usage/custom-committers)
    - [ Manual Commit ](/docs/v2.9/advanced-usage/manual-commit)
    - [ Middlewares ](/docs/v2.9/advanced-usage/middlewares)
    - [ Stop consumer after last messages ](/docs/v2.9/advanced-usage/stop-consumer-after-last-message)
    - [ Stop consumer on demand ](/docs/v2.9/advanced-usage/stopping-a-consumer)
    - [ Writing custom loggers ](/docs/v2.9/advanced-usage/custom-loggers)
    - [ Before and after callbacks ](/docs/v2.9/advanced-usage/before-callbacks)
    - [ Setting global configurations ](/docs/v2.9/advanced-usage/setting-global-configuration)
    - [ Sending multiple messages with the same producer ](/docs/v2.9/advanced-usage/sending-multiple-messages-with-the-same-producer)
- Testing
    -------

    - [ Kafka fake ](/docs/v2.9/testing/fake)
    - [ Assert Published ](/docs/v2.9/testing/assert-published)
    - [ Assert published On ](/docs/v2.9/testing/assert-published-on)
    - [ Assert nothing published ](/docs/v2.9/testing/assert-nothing-published)
    - [ Assert published times ](/docs/v2.9/testing/assert-published-times)
    - [ Assert published on times ](/docs/v2.9/testing/assert-published-on-times)
    - [ Mocking your kafka consumer ](/docs/v2.9/testing/mocking-your-kafka-consumer)

  Custom Committers
===================

By default, the committers provided by the `DefaultCommitterFactory` are provided.

Support Laravel Kafka by sponsoring me!

Do you find Laravel Kafka valuable and wanna support its development?

Laravel Kafka is free and Open Source software, built to empower developers like you. Your support helps maintain and enhance the project. If you find it valuable, please consider sponsoring me on GitHub. Every contribution makes a difference and keeps the development going strong! Thank you!

   [ Become a Sponsor ](https://github.com/sponsors/mateusjunges)

 Want to hide this message? Sponsor at any tier of $10/month or more!

To set a custom committer on your consumer, add the committer via a factory that implements the `CommitterFactory` interface:

         ```
use Junges\Kafka\Config\Config;
use Junges\Kafka\Contracts\Committer;
use Junges\Kafka\Contracts\CommitterFactory;
use RdKafka\KafkaConsumer;
use RdKafka\Message;

class MyCommitter implements Committer
{
    public function commitMessage(Message $message, bool $success) : void {
        // ...
    }

    public function commitDlq(Message $message) : void {
        // ...
    }
}

class MyCommitterFactory implements CommitterFactory
{
    public function make(KafkaConsumer $kafkaConsumer, Config $config) : Committer {
        // ...
    }
}

$consumer = \Junges\Kafka\Facades\Kafka::consumer()
    ->usingCommitterFactory(new MyCommitterFactory())
    ->build();
```

### [](#content-manual-commit-support "Permalink")Manual commit support

Custom committers support both automatic and manual commit operations. The `Committer` interface includes:

- `commitMessage(Message $message, bool $success): void` - Used for automatic commits
- `commitDlq(Message $message): void` - Used for dead letter queue commits
- `commit(mixed $messageOrOffsets = null): void` - Used for manual synchronous commits
- `commitAsync(mixed $messageOrOffsets = null): void` - Used for manual asynchronous commits

When handlers call `$consumer->commit()` or `$consumer->commitAsync()`, these calls are routed through your custom committer, ensuring consistent behavior across all commit types.

### [](#content-usage-example "Permalink")Usage example

If you want to define a new committer for you consumer, you must start by creating a new class that implements the `Committer` interface. The `commitMessage` function has a `$success` param, which is true for all messages that were consumed without throwing exceptions or messages which exceptions were handled successfully by the consumer class. So, the following committer will commit only messages that were consumed without throwing an exception:

         ```
use Junges\Kafka\Contracts\ConsumerMessage;
use RdKafka\TopicPartition;

class CustomCommitter implements CommitterContract
{
    public function __construct(private KafkaConsumer $consumer) {}

    public function commitMessage(Message $message, bool $success): void
    {
        if (! $success) {
            return;
        }

        $this->consumer->commit($message);
    }

    public function commitDlq(Message $message): void
    {
        $this->consumer->commit($message);
    }

    public function commit(mixed $messageOrOffsets = null): void
    {
        // Handle manual commits
        if ($messageOrOffsets instanceof ConsumerMessage) {
            $topicPartition = new TopicPartition(
                $messageOrOffsets->getTopicName(),
                $messageOrOffsets->getPartition(),
                $messageOrOffsets->getOffset() + 1
            );
            $messageOrOffsets = [$topicPartition];
        }

        $this->consumer->commit($messageOrOffsets);
    }

    public function commitAsync(mixed $messageOrOffsets = null): void
    {
        // Handle manual async commits
        if ($messageOrOffsets instanceof ConsumerMessage) {
            $topicPartition = new TopicPartition(
                $messageOrOffsets->getTopicName(),
                $messageOrOffsets->getPartition(),
                $messageOrOffsets->getOffset() + 1
            );
            $messageOrOffsets = [$topicPartition];
        }

        $this->consumer->commitAsync($messageOrOffsets);
    }
}
```

After creating your custom committer implementation, you must create a committer factory, which is a simples class that implements the `CommitterFactory` interface, which will be used to provide your custom committer implementation to the consumer class:

         ```
class CustomCommitterFactory implements CommitterFactory
{
    public function make(KafkaConsumer $kafkaConsumer, Config $config): CommitterContract
    {
        return new RetryableCommitter(
            new SuccessCommitter($kafkaConsumer),
            new NativeSleeper(),
            $config->getMaxCommitRetries()
        );
    }
}
```

To use this committer implementation, you just need to inform your consumer that you want to use a custom committer class:

         ```
use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::consumer()
    ->usingCommitterFactory(new CustomCommitterFactory())
    ->build();
```

 Previous  [ SASL Authentication    ](https://laravelkafka.com/docs/v2.9/advanced-usage/sasl-authentication)

 Next  [ Manual Commit    ](https://laravelkafka.com/docs/v2.9/advanced-usage/manual-commit)

Sponsors

 [ version="1.0" encoding="UTF-8"?       EasyCal ](https://easycal.app/)

 [       Search  ⌘ K   ](https://typesense.org/)
