Message handlers | Laravel Kafka

 [ Laravel Kafka ](/)

     Search

 ⌘K

  [   Login with GitHub ](https://laravelkafka.com/oauth/github/redirect)

    Docs for version          selected

 v2.11

 v2.10

 v2.9

 v2.8

 v1.13

- - [ Introduction ](/docs/v2.9/introduction)
    - [ Requirements ](/docs/v2.9/requirements)
    - [ Installation and Setup ](/docs/v2.9/installation-and-setup)
    - [ Questions and issues ](/docs/v2.9/questions-and-issues)
    - [ Changelog ](/docs/v2.9/changelog)
    - [ Upgrade guide ](/docs/v2.9/upgrade-guide)
    - [ Example docker-compose file ](/docs/v2.9/example-docker-compose)
- Producing messages
    ------------------

    - [ Producing messages ](/docs/v2.9/producing-messages/producing-messages)
    - [ Configuring your kafka producer ](/docs/v2.9/producing-messages/configuring-producers)
    - [ Configuring message payload ](/docs/v2.9/producing-messages/configuring-message-payload)
    - [ Custom serializers ](/docs/v2.9/producing-messages/custom-serializers)
    - [ Publishing to kafka ](/docs/v2.9/producing-messages/publishing-to-kafka)
- Consuming messages
    ------------------

    - [ Creating a kafka consumer ](/docs/v2.9/consuming-messages/creating-consumer)
    - [ Subscribing to kafka topics ](/docs/v2.9/consuming-messages/subscribing-to-kafka-topics)
    - [ Using regex to subscribe to kafka topics ](/docs/v2.9/consuming-messages/using-regex-to-subscribe-to-kafka-topics)
    - [ Assigning consumers to a topic partition ](/docs/v2.9/consuming-messages/assigning-partitions)
    - [ Consuming messages from specific offsets ](/docs/v2.9/consuming-messages/consuming-from-specific-offsets)
    - [ Consumer groups ](/docs/v2.9/consuming-messages/consumer-groups)
    - [ Partition Discovery and Dynamic Assignment ](/docs/v2.9/consuming-messages/partition-discovery)
    - [ Message handlers ](/docs/v2.9/consuming-messages/message-handlers)
    - [ Configuring consumer options ](/docs/v2.9/consuming-messages/configuring-consumer-options)
    - [ Custom deserializers ](/docs/v2.9/consuming-messages/custom-deserializers)
    - [ Consuming messages ](/docs/v2.9/consuming-messages/consuming-messages)
    - [ Class structure ](/docs/v2.9/consuming-messages/class-structure)
    - [ Queueable handlers ](/docs/v2.9/consuming-messages/queueable-handlers)
- Advanced usage
    --------------

    - [ Replacing the default serializer/deserializer ](/docs/v2.9/advanced-usage/replacing-default-serializer)
    - [ Graceful shutdown ](/docs/v2.9/advanced-usage/graceful-shutdown)
    - [ SASL Authentication ](/docs/v2.9/advanced-usage/sasl-authentication)
    - [ Custom Committers ](/docs/v2.9/advanced-usage/custom-committers)
    - [ Manual Commit ](/docs/v2.9/advanced-usage/manual-commit)
    - [ Middlewares ](/docs/v2.9/advanced-usage/middlewares)
    - [ Stop consumer after last messages ](/docs/v2.9/advanced-usage/stop-consumer-after-last-message)
    - [ Stop consumer on demand ](/docs/v2.9/advanced-usage/stopping-a-consumer)
    - [ Writing custom loggers ](/docs/v2.9/advanced-usage/custom-loggers)
    - [ Before and after callbacks ](/docs/v2.9/advanced-usage/before-callbacks)
    - [ Setting global configurations ](/docs/v2.9/advanced-usage/setting-global-configuration)
    - [ Sending multiple messages with the same producer ](/docs/v2.9/advanced-usage/sending-multiple-messages-with-the-same-producer)
- Testing
    -------

    - [ Kafka fake ](/docs/v2.9/testing/fake)
    - [ Assert Published ](/docs/v2.9/testing/assert-published)
    - [ Assert published On ](/docs/v2.9/testing/assert-published-on)
    - [ Assert nothing published ](/docs/v2.9/testing/assert-nothing-published)
    - [ Assert published times ](/docs/v2.9/testing/assert-published-times)
    - [ Assert published on times ](/docs/v2.9/testing/assert-published-on-times)
    - [ Mocking your kafka consumer ](/docs/v2.9/testing/mocking-your-kafka-consumer)

  Message handlers
==================

Now that you have created your kafka consumer, you must create a handler for the messages this consumer receives. By default, a consumer is any `callable`. You can use an invokable class or a simple callback. Use the `withHandler` method to specify your handler:

         ```
$consumer = \Junges\Kafka\Facades\Kafka::consumer();

// Using callback:
$consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
    // Handle your message here
});
```

Or, using an invokable class:

         ```
class Handler
{
    public function __invoke(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
        // Handle your message here
    }
}

$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withHandler(new Handler)
```

The `ConsumerMessage` contract gives you some handy methods to get the message properties:

- `getKey()`: Returns the Kafka Message Key
- `getTopicName()`: Returns the topic where the message was published
- `getPartition()`: Returns the kafka partition where the message was published
- `getHeaders()`: Returns the kafka message headers
- `getBody()`: Returns the body of the message
- `getOffset()`: Returns the offset where the message was published

[](#content-manual-commit-in-handlers "Permalink")Manual Commit in Handlers
---------------------------------------------------------------------------

When using manual commit mode (`withAutoCommit(false)`), your handlers receive a `$consumer` parameter that provides commit methods. This allows you to control exactly when message offsets are committed:

         ```
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
    ->withManualCommit()  // Enable manual commit mode
    ->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
        try {
            // Process your message
            $data = json_decode($message->getBody(), true);
            processBusinessLogic($data);

            // Commit the message after successful processing
            $consumer->commit($message);

        } catch (ValidationException $e) {
            // Don't commit invalid messages, send to DLQ or handle differently
            Log::warning('Invalid message format', ['message' => $message->getBody()]);

        } catch (Exception $e) {
            Log::error('Processing failed', ['error' => $e->getMessage()]);
            throw $e;
        }
    });
```

### [](#content-available-commit-methods "Permalink")Available Commit Methods

The `$consumer` parameter provides these commit methods:

**Synchronous commits** (blocking):

- `$consumer->commit()` - Commit current assignment offsets
- `$consumer->commit($message)` - Commit specific message offset

**Asynchronous commits** (non-blocking, better performance):

- `$consumer->commitAsync()` - Commit current assignment offsets
- `$consumer->commitAsync($message)` - Commit specific message offset

[](#content-handler-classes "Permalink")Handler Classes
-------------------------------------------------------

You can also create dedicated handler classes by implementing the `Handler` interface. Handler classes receive both the message and consumer parameters, just like closure handlers:

         ```
use Junges\Kafka\Contracts\Handler;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Contracts\MessageConsumer;

class ProcessOrderHandler implements Handler
{
    public function __invoke(ConsumerMessage $message, MessageConsumer $consumer): void
    {
        try {
            $order = json_decode($message->getBody(), true);

            // Process the order
            $this->processOrder($order);

            // Manual commit after successful processing
            $consumer->commit($message);

        } catch (ValidationException $e) {
            // Don't commit invalid messages
            Log::warning('Invalid order data', ['message' => $message->getBody()]);

        } catch (Exception $e) {
            // Don't commit on processing errors
            Log::error('Order processing failed', ['error' => $e->getMessage()]);
            throw $e; // Re-throw to trigger DLQ handling if configured
        }
    }

    private function processOrder(array $order): void
    {
        // Your business logic here
    }
}
```

**Using Handler classes with the consumer:**

         ```
use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::consumer(['orders'])
    ->withManualCommit()  // Enable manual commit mode
    ->withHandler(new ProcessOrderHandler())
    ->build();

$consumer->consume();
```

Support Laravel Kafka by sponsoring me!

Do you find Laravel Kafka valuable and wanna support its development?

Laravel Kafka is free and Open Source software, built to empower developers like you. Your support helps maintain and enhance the project. If you find it valuable, please consider sponsoring me on GitHub. Every contribution makes a difference and keeps the development going strong! Thank you!

   [ Become a Sponsor ](https://github.com/sponsors/mateusjunges)

 Want to hide this message? Sponsor at any tier of $10/month or more!

 Previous  [ Partition Discovery and Dynamic Assignment    ](https://laravelkafka.com/docs/v2.9/consuming-messages/partition-discovery)

 Next  [ Configuring consumer options    ](https://laravelkafka.com/docs/v2.9/consuming-messages/configuring-consumer-options)

Sponsors

 [ version="1.0" encoding="UTF-8"?       EasyCal ](https://easycal.app/)

 [       Search  ⌘ K   ](https://typesense.org/)
