Configuring consumer options

The ConsumerBuilder offers you some few configuration options.

Support Laravel Kafka by sponsoring me!

Do you find Laravel Kafka valuable and wanna support its development?

Laravel Kafka is free and Open Source software, built to empower developers like you. Your support helps maintain and enhance the project. If you find it valuable, please consider sponsoring me on GitHub. Every contribution makes a difference and keeps the development going strong! Thank you!

Want to hide this message? Sponsor at any tier of $10/month or more!

In kafka, a Dead Letter Queue (or DLQ), is a simple kafka topic in the kafka cluster which acts as the destination for messages that were not able to make it to the desired destination due to some error.

To create a dlq in this package, you can use the withDlq method. If you don't specify the DLQ topic name, it will be created based on the topic you are consuming, adding the -dlq suffix to the topic name.

$consumer = \Junges\Kafka\Facades\Kafka::consumer()->subscribe('topic')->withDlq();

//Or, specifying the dlq topic name:
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->subscribe('topic')->withDlq('your-dlq-topic-name')

When your message is sent to the dead letter queue, we will add three header keys to containing information about what happened to that message:

  • kafka_throwable_message: The exception message
  • kafka_throwable_code: The exception code
  • kafka_throwable_class_name: The exception class name.

The package supports two commit modes for controlling when message offsets are committed to Kafka:

With auto-commit enabled, messages are automatically committed after your handler successfully processes them. This is the default behavior and simplest to use:

$consumer = \Junges\Kafka\Facades\Kafka::consumer()
    ->withAutoCommit() // Optional as this is the default
    ->withHandler(function($message, $consumer) {
        // Process your message.
        // Message is automatically committed after handler returns successfully
    });

With manual commit, you have full control over when messages are committed. This provides better error handling and processing guarantees:

$consumer = \Junges\Kafka\Facades\Kafka::consumer()
    ->withManualCommit()
    ->withHandler(function($message, $consumer) {
        try {
            // Process your message
            processMessage($message);
            
            // Manually commit the message
            $consumer->commit($message);  // Synchronous commit
            // OR: $consumer->commitAsync($message);  // Asynchronous commit
            
        } catch (Exception $e) {
            Log::error('Message processing failed', ['error' => $e->getMessage()]);
        }
    });
  • Auto-commit: Simple use cases where message loss is acceptable, and you want automatic offset management
  • Manual commit: When you need guaranteed processing, complex error handling, or want to implement custom commit strategies

When using manual commit mode, your handlers can use these methods on the $consumer parameter:

  • commit() - Commit current assignment offsets (synchronous)
  • commit($message) - Commit specific message offset (synchronous)
  • commitAsync() - Commit current assignment offsets (asynchronous)
  • commitAsync($message) - Commit specific message offset (asynchronous)

For more detailed information about manual commit patterns, see the Manual Commit guide .

If you want to consume a limited amount of messages, you can use the withMaxMessages method to set the max number of messages to be consumed by a kafka consumer:

$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withMaxMessages(2);

If you want to consume a limited amount of time, you can use the withMaxTime method to set the max number of seconds for kafka consumer to process messages:

$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withMaxTime(3600);

To set configuration options, you can use two methods: withOptions, passing an array of option and option value or, using the `withOption method and passing two arguments, the option name and the option value.

$consumer = \Junges\Kafka\Facades\Kafka::consumer()
    ->withOptions([
        'option-name' => 'option-value'
    ]);
// Or:
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
    ->withOption('option-name', 'option-value');
Previous
Message handlers