Manual Commit | Laravel Kafka

 [ Laravel Kafka ](/)

     Search

 ⌘K

  [   Login with GitHub ](https://laravelkafka.com/oauth/github/redirect)

    Docs for version          selected

 v2.11

 v2.10

 v2.9

 v2.8

 v1.13

- - [ Introduction ](/docs/v2.9/introduction)
    - [ Requirements ](/docs/v2.9/requirements)
    - [ Installation and Setup ](/docs/v2.9/installation-and-setup)
    - [ Questions and issues ](/docs/v2.9/questions-and-issues)
    - [ Changelog ](/docs/v2.9/changelog)
    - [ Upgrade guide ](/docs/v2.9/upgrade-guide)
    - [ Example docker-compose file ](/docs/v2.9/example-docker-compose)
- Producing messages
    ------------------

    - [ Producing messages ](/docs/v2.9/producing-messages/producing-messages)
    - [ Configuring your kafka producer ](/docs/v2.9/producing-messages/configuring-producers)
    - [ Configuring message payload ](/docs/v2.9/producing-messages/configuring-message-payload)
    - [ Custom serializers ](/docs/v2.9/producing-messages/custom-serializers)
    - [ Publishing to kafka ](/docs/v2.9/producing-messages/publishing-to-kafka)
- Consuming messages
    ------------------

    - [ Creating a kafka consumer ](/docs/v2.9/consuming-messages/creating-consumer)
    - [ Subscribing to kafka topics ](/docs/v2.9/consuming-messages/subscribing-to-kafka-topics)
    - [ Using regex to subscribe to kafka topics ](/docs/v2.9/consuming-messages/using-regex-to-subscribe-to-kafka-topics)
    - [ Assigning consumers to a topic partition ](/docs/v2.9/consuming-messages/assigning-partitions)
    - [ Consuming messages from specific offsets ](/docs/v2.9/consuming-messages/consuming-from-specific-offsets)
    - [ Consumer groups ](/docs/v2.9/consuming-messages/consumer-groups)
    - [ Partition Discovery and Dynamic Assignment ](/docs/v2.9/consuming-messages/partition-discovery)
    - [ Message handlers ](/docs/v2.9/consuming-messages/message-handlers)
    - [ Configuring consumer options ](/docs/v2.9/consuming-messages/configuring-consumer-options)
    - [ Custom deserializers ](/docs/v2.9/consuming-messages/custom-deserializers)
    - [ Consuming messages ](/docs/v2.9/consuming-messages/consuming-messages)
    - [ Class structure ](/docs/v2.9/consuming-messages/class-structure)
    - [ Queueable handlers ](/docs/v2.9/consuming-messages/queueable-handlers)
- Advanced usage
    --------------

    - [ Replacing the default serializer/deserializer ](/docs/v2.9/advanced-usage/replacing-default-serializer)
    - [ Graceful shutdown ](/docs/v2.9/advanced-usage/graceful-shutdown)
    - [ SASL Authentication ](/docs/v2.9/advanced-usage/sasl-authentication)
    - [ Custom Committers ](/docs/v2.9/advanced-usage/custom-committers)
    - [ Manual Commit ](/docs/v2.9/advanced-usage/manual-commit)
    - [ Middlewares ](/docs/v2.9/advanced-usage/middlewares)
    - [ Stop consumer after last messages ](/docs/v2.9/advanced-usage/stop-consumer-after-last-message)
    - [ Stop consumer on demand ](/docs/v2.9/advanced-usage/stopping-a-consumer)
    - [ Writing custom loggers ](/docs/v2.9/advanced-usage/custom-loggers)
    - [ Before and after callbacks ](/docs/v2.9/advanced-usage/before-callbacks)
    - [ Setting global configurations ](/docs/v2.9/advanced-usage/setting-global-configuration)
    - [ Sending multiple messages with the same producer ](/docs/v2.9/advanced-usage/sending-multiple-messages-with-the-same-producer)
- Testing
    -------

    - [ Kafka fake ](/docs/v2.9/testing/fake)
    - [ Assert Published ](/docs/v2.9/testing/assert-published)
    - [ Assert published On ](/docs/v2.9/testing/assert-published-on)
    - [ Assert nothing published ](/docs/v2.9/testing/assert-nothing-published)
    - [ Assert published times ](/docs/v2.9/testing/assert-published-times)
    - [ Assert published on times ](/docs/v2.9/testing/assert-published-on-times)
    - [ Mocking your kafka consumer ](/docs/v2.9/testing/mocking-your-kafka-consumer)

  Manual Commit
===============

Manual commit gives you complete control over when message offsets are committed to Kafka. This provides stronger processing guarantees and better error handling compared to auto-commit mode.

Support Laravel Kafka by sponsoring me!

Do you find Laravel Kafka valuable and wanna support its development?

Laravel Kafka is free and Open Source software, built to empower developers like you. Your support helps maintain and enhance the project. If you find it valuable, please consider sponsoring me on GitHub. Every contribution makes a difference and keeps the development going strong! Thank you!

   [ Become a Sponsor ](https://github.com/sponsors/mateusjunges)

 Want to hide this message? Sponsor at any tier of $10/month or more!

[](#content-overview "Permalink")Overview
-----------------------------------------

By default, the package uses auto-commit mode where messages are automatically committed after your handler successfully processes them. With manual commit, you decide exactly when to commit messages, allowing for:

- **At-least-once delivery**: Ensure messages are only committed after successful processing
- **Better error handling**: Don't commit messages that failed to process
- **Custom commit strategies**: Implement batch commits, conditional commits, or other patterns
- **Performance optimization**: Use asynchronous commits for better throughput

[](#content-enabling-manual-commit "Permalink")Enabling Manual Commit
---------------------------------------------------------------------

To enable manual commit, set `withManualCommit()` when creating your consumer:

         ```
use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::consumer(['my-topic'])
    ->withManualCommit() // Disable auto-commit
    ->withHandler(function($message, $consumer) {
        // Your handler with manual commit control
    })
    ->build();
```

[](#content-commit-methods-api "Permalink")Commit Methods API
-------------------------------------------------------------

Your message handlers receive a `$consumer` parameter with these commit methods:

### [](#content-synchronous-commits-blocking "Permalink")Synchronous Commits (Blocking)

         ```
// Commit all current assignment offsets
$consumer->commit();

// Commit specific message offset
$consumer->commit($message);

// Commit specific partition offsets
$consumer->commit([$topicPartition1, $topicPartition2]);
```

### [](#content-asynchronous-commits-non-blocking "Permalink")Asynchronous Commits (Non-blocking)

         ```
// Commit all current assignment offsets asynchronously
$consumer->commitAsync();

// Commit specific message offset asynchronously
$consumer->commitAsync($message);

// Commit specific partition offsets asynchronously
$consumer->commitAsync([$topicPartition1, $topicPartition2]);
```

### [](#content-parameters "Permalink")Parameters

All commit methods accept these parameter types:

- **`null`** (default): Commit offsets for current assignment
- **`ConsumerMessage`**: Commit offset for the specific message
- **`RdKafka\Message`**: Commit offset for the underlying Kafka message
- **`RdKafka\TopicPartition[]`**: Commit specific partition offsets

[](#content-basic-usage-patterns "Permalink")Basic Usage Patterns
-----------------------------------------------------------------

### [](#content-simple-manual-commit "Permalink")Simple Manual Commit

         ```
$consumer = Kafka::consumer(['orders'])
    ->withManualCommit()
    ->withHandler(function($message, $consumer) {
        try {
            // Process the order
            $order = json_decode($message->getBody(), true);
            processOrder($order);

            // Commit only after successful processing
            $consumer->commit($message);

        } catch (Exception $e) {
            Log::error('Order processing failed', [
                'error' => $e->getMessage(),
                'order_id' => $order['id'] ?? 'unknown'
            ]);
        }
    });
```

### [](#content-async-commit-for-better-performance "Permalink")Async Commit for Better Performance

         ```
$consumer->withHandler(function($message, $consumer) {
    // Process message
    processMessage($message);

    // Use async commit for better throughput
    $consumer->commitAsync($message);

    // Handler can continue immediately without waiting for commit
});
```

[](#content-error-handling "Permalink")Error Handling
-----------------------------------------------------

### [](#content-retry-logic-with-manual-commit "Permalink")Retry Logic with Manual Commit

         ```
$consumer->withHandler(function($message, $consumer) {
    $maxRetries = 3;
    $attempt = 0;

    while ($attempt < $maxRetries) {
        try {
            processMessage($message);
            $consumer->commit($message);
            return; // Success

        } catch (RetryableException $e) {
            $attempt++;
            Log::warning("Retry attempt {$attempt}", ['error' => $e->getMessage()]);

            if ($attempt >= $maxRetries) {
                // Send to DLQ or handle permanent failure
                Log::error('Max retries exceeded', ['error' => $e->getMessage()]);
                throw $e;
            }

            sleep(pow(2, $attempt));

        } catch (Exception $e) {
            // Non-retryable error
            Log::error('Non-retryable error', ['error' => $e->getMessage()]);
            throw $e;
        }
    }
});
```

### [](#content-dead-letter-queue-integration "Permalink")Dead Letter Queue Integration

Manual commit works seamlessly with DLQ functionality:

         ```
$consumer = Kafka::consumer(['orders'])
    ->withManualCommit()
    ->withDlq('orders-dlq') // Configure DLQ
    ->withHandler(function($message, $consumer) {
        try {
            processOrder($message);
            $consumer->commit($message);

        } catch (ValidationException $e) {
            // Don't commit - let DLQ handling take over
            Log::error('Invalid order format', ['error' => $e->getMessage()]);
            throw $e; // This will trigger DLQ
        }
    });
```

[](#content-performance-considerations "Permalink")Performance Considerations
-----------------------------------------------------------------------------

### [](#content-sync-vs-async-commits "Permalink")Sync vs Async Commits

- **Synchronous commits**: Slower but guarantees the commit completed
- **Asynchronous commits**: Faster but fire-and-forget

         ```
// High-throughput scenario - use async
$consumer->commitAsync($message);

// Critical data - use sync for guarantee
$consumer->commit($message);
```

### [](#content-batch-commit-optimization "Permalink")Batch Commit Optimization

         ```
$consumer->withHandler(function($message, $consumer) {
    static $messages = [];

    // Collect messages
    $messages[] = $message;

    // Batch commit every 100 messages
    if (count($messages) >= 100) {
        // Process all messages
        foreach ($messages as $msg) {
            processMessage($msg);
        }

        // Commit the last message's offset (commits all previous)
        $consumer->commitAsync(end($messages));
        $messages = [];
    }
});
```

[](#content-migration-from-auto-commit "Permalink")Migration from Auto-Commit
-----------------------------------------------------------------------------

To migrate existing auto-commit consumers to manual commit:

### [](#content-before-auto-commit "Permalink")Before (Auto-commit)

         ```
$consumer = Kafka::consumer(['topic'])
    ->withAutoCommit()  // or omit, it's the default
    ->withHandler(function($message, $consumer) {
        processMessage($message);
    });
```

### [](#content-after-manual-commit "Permalink")After (Manual commit)

         ```
$consumer = Kafka::consumer(['topic'])
    ->withManualCommit() // Enable manual control
    ->withHandler(function($message, $consumer) {
        try {
            processMessage($message);
            $consumer->commit($message); // Explicit commit
        } catch (Exception $e) {
            // Handle errors without committing
            Log::error('Processing failed', ['error' => $e->getMessage()]);
        }
    });
```

[](#content-best-practices "Permalink")Best Practices
-----------------------------------------------------

1. **Always commit after successful processing**: Only commit messages that were fully processed
2. **Use async commits for performance**: Unless you need commit guarantees, use `commitAsync()`
3. **Implement proper error handling**: Don't commit messages that failed to process
4. **Handle duplicate processing**: Manual commit provides at-least-once delivery, so implement idempotent processing

[](#content-troubleshooting "Permalink")Troubleshooting
-------------------------------------------------------

### [](#content-common-issues "Permalink")Common Issues

**Messages being reprocessed repeatedly:**

- Check that you're calling `commit()` after successful processing
- Ensure exceptions don't prevent the commit call
- Verify your error handling doesn't commit failed messages

**Poor performance:**

- Use `commitAsync()` instead of `commit()` for better throughput
- Implement batch commits for high-volume scenarios
- Avoid committing every single message in high-throughput scenarios

**Offset commit errors:**

- Check Kafka broker connectivity
- Verify consumer group permissions
- Monitor Kafka logs for commit-related errors

 Previous  [ Custom Committers    ](https://laravelkafka.com/docs/v2.9/advanced-usage/custom-committers)

 Next  [ Middlewares    ](https://laravelkafka.com/docs/v2.9/advanced-usage/middlewares)

Sponsors

 [ version="1.0" encoding="UTF-8"?       EasyCal ](https://easycal.app/)

 [       Search  ⌘ K   ](https://typesense.org/)
