Partition Discovery and Dynamic Assignment | Laravel Kafka

 [ Laravel Kafka ](/)

     Search

 ⌘K

  [   Login with GitHub ](https://laravelkafka.com/oauth/github/redirect)

    Docs for version          selected

 v2.11

 v2.10

 v2.9

 v2.8

 v1.13

- - [ Introduction ](/docs/v2.9/introduction)
    - [ Requirements ](/docs/v2.9/requirements)
    - [ Installation and Setup ](/docs/v2.9/installation-and-setup)
    - [ Questions and issues ](/docs/v2.9/questions-and-issues)
    - [ Changelog ](/docs/v2.9/changelog)
    - [ Upgrade guide ](/docs/v2.9/upgrade-guide)
    - [ Example docker-compose file ](/docs/v2.9/example-docker-compose)
- Producing messages
    ------------------

    - [ Producing messages ](/docs/v2.9/producing-messages/producing-messages)
    - [ Configuring your kafka producer ](/docs/v2.9/producing-messages/configuring-producers)
    - [ Configuring message payload ](/docs/v2.9/producing-messages/configuring-message-payload)
    - [ Custom serializers ](/docs/v2.9/producing-messages/custom-serializers)
    - [ Publishing to kafka ](/docs/v2.9/producing-messages/publishing-to-kafka)
- Consuming messages
    ------------------

    - [ Creating a kafka consumer ](/docs/v2.9/consuming-messages/creating-consumer)
    - [ Subscribing to kafka topics ](/docs/v2.9/consuming-messages/subscribing-to-kafka-topics)
    - [ Using regex to subscribe to kafka topics ](/docs/v2.9/consuming-messages/using-regex-to-subscribe-to-kafka-topics)
    - [ Assigning consumers to a topic partition ](/docs/v2.9/consuming-messages/assigning-partitions)
    - [ Consuming messages from specific offsets ](/docs/v2.9/consuming-messages/consuming-from-specific-offsets)
    - [ Consumer groups ](/docs/v2.9/consuming-messages/consumer-groups)
    - [ Partition Discovery and Dynamic Assignment ](/docs/v2.9/consuming-messages/partition-discovery)
    - [ Message handlers ](/docs/v2.9/consuming-messages/message-handlers)
    - [ Configuring consumer options ](/docs/v2.9/consuming-messages/configuring-consumer-options)
    - [ Custom deserializers ](/docs/v2.9/consuming-messages/custom-deserializers)
    - [ Consuming messages ](/docs/v2.9/consuming-messages/consuming-messages)
    - [ Class structure ](/docs/v2.9/consuming-messages/class-structure)
    - [ Queueable handlers ](/docs/v2.9/consuming-messages/queueable-handlers)
- Advanced usage
    --------------

    - [ Replacing the default serializer/deserializer ](/docs/v2.9/advanced-usage/replacing-default-serializer)
    - [ Graceful shutdown ](/docs/v2.9/advanced-usage/graceful-shutdown)
    - [ SASL Authentication ](/docs/v2.9/advanced-usage/sasl-authentication)
    - [ Custom Committers ](/docs/v2.9/advanced-usage/custom-committers)
    - [ Manual Commit ](/docs/v2.9/advanced-usage/manual-commit)
    - [ Middlewares ](/docs/v2.9/advanced-usage/middlewares)
    - [ Stop consumer after last messages ](/docs/v2.9/advanced-usage/stop-consumer-after-last-message)
    - [ Stop consumer on demand ](/docs/v2.9/advanced-usage/stopping-a-consumer)
    - [ Writing custom loggers ](/docs/v2.9/advanced-usage/custom-loggers)
    - [ Before and after callbacks ](/docs/v2.9/advanced-usage/before-callbacks)
    - [ Setting global configurations ](/docs/v2.9/advanced-usage/setting-global-configuration)
    - [ Sending multiple messages with the same producer ](/docs/v2.9/advanced-usage/sending-multiple-messages-with-the-same-producer)
- Testing
    -------

    - [ Kafka fake ](/docs/v2.9/testing/fake)
    - [ Assert Published ](/docs/v2.9/testing/assert-published)
    - [ Assert published On ](/docs/v2.9/testing/assert-published-on)
    - [ Assert nothing published ](/docs/v2.9/testing/assert-nothing-published)
    - [ Assert published times ](/docs/v2.9/testing/assert-published-times)
    - [ Assert published on times ](/docs/v2.9/testing/assert-published-on-times)
    - [ Mocking your kafka consumer ](/docs/v2.9/testing/mocking-your-kafka-consumer)

  Partition Discovery and Dynamic Assignment
============================================

The Laravel Kafka package provides several methods to discover and work with partition assignments dynamically, which is especially useful when you need to set specific offsets but don't know the partition numbers in advance.

Support Laravel Kafka by sponsoring me!

Do you find Laravel Kafka valuable and wanna support its development?

Laravel Kafka is free and Open Source software, built to empower developers like you. Your support helps maintain and enhance the project. If you find it valuable, please consider sponsoring me on GitHub. Every contribution makes a difference and keeps the development going strong! Thank you!

   [ Become a Sponsor ](https://github.com/sponsors/mateusjunges)

 Want to hide this message? Sponsor at any tier of $10/month or more!

[](#content-getting-assigned-partitions "Permalink")Getting Assigned Partitions
-------------------------------------------------------------------------------

After starting a consumer, you can retrieve the current partition assignment:

         ```
$consumer = \Junges\Kafka\Facades\Kafka::consumer(['my-topic'], 'my-group')
    ->withHandler(function ($message) {
        // Handle message
    })
    ->build();

// Get the assigned partitions (returns array of RdKafka\TopicPartition objects)
$assignedPartitions = $consumer->getAssignedPartitions();

foreach ($assignedPartitions as $partition) {
    echo "Topic: {$partition->getTopic()}, Partition: {$partition->getPartition()}\n";
}
```

**Note:** `getAssignedPartitions()` returns an empty array until the consumer has been initialized and partitions have been assigned by the broker.

[](#content-partition-assignment-callbacks "Permalink")Partition Assignment Callbacks
-------------------------------------------------------------------------------------

You can set a callback that gets executed whenever partitions are assigned to your consumer:

         ```
$consumer = \Junges\Kafka\Facades\Kafka::consumer(['my-topic'], 'my-group')
    ->withPartitionAssignmentCallback(function ($partitions) {
        echo "Assigned " . count($partitions) . " partitions:\n";

        foreach ($partitions as $partition) {
            echo "- Topic: {$partition->getTopic()}, Partition: {$partition->getPartition()}\n";
        }
    })
    ->withHandler(function ($message) {
        // Handle message
    });
```

This callback is particularly useful for:

- Logging partition assignments for debugging
- Initializing partition-specific resources
- Tracking partition assignment changes during rebalancing

[](#content-dynamic-partition-assignment-with-offsets "Permalink")Dynamic Partition Assignment with Offsets
-----------------------------------------------------------------------------------------------------------

The most powerful feature is the ability to dynamically assign offsets based on discovered partitions:

         ```
$consumer = \Junges\Kafka\Facades\Kafka::consumer(['my-topic'], 'my-group')
    ->assignPartitionsWithOffsets(function ($partitions) {
        $partitionsWithOffsets = [];

        foreach ($partitions as $partition) {
            // Set different offset strategies based on partition
            if ($partition->getPartition() === 0) {
                // Start from the beginning for partition 0
                $partition->setOffset(RD_KAFKA_OFFSET_BEGINNING);
            } elseif ($partition->getPartition() === 1) {
                // Start from the end for partition 1
                $partition->setOffset(RD_KAFKA_OFFSET_END);
            } else {
                // Start from a specific offset for other partitions
                $partition->setOffset(1000);
            }

            $partitionsWithOffsets[] = $partition;
        }

        return $partitionsWithOffsets;
    })
    ->withHandler(function ($message) {
        // Handle message
    });
```

[](#content-practical-examples "Permalink")Practical Examples
-------------------------------------------------------------

### [](#content-resume-from-stored-offsets "Permalink")Resume from Stored Offsets

         ```
$consumer = \Junges\Kafka\Facades\Kafka::consumer(['user-events'], 'analytics-group')
    ->assignPartitionsWithOffsets(function ($partitions) {
        $partitionsWithOffsets = [];

        foreach ($partitions as $partition) {
            // Get stored offset from database or cache
            $storedOffset = Cache::get("kafka_offset_{$partition->getTopic()}_{$partition->getPartition()}", 0);

            $partition->setOffset($storedOffset);
            $partitionsWithOffsets[] = $partition;

            Log::info("Resuming from offset {$storedOffset} for partition {$partition->getPartition()}");
        }

        return $partitionsWithOffsets;
    })
    ->withHandler(function ($message) {
        // Handle message

        // Store current offset
        Cache::put("kafka_offset_{$message->getTopicName()}_{$message->getPartition()}", $message->getOffset() + 1);
    });
```

### [](#content-time-based-offset-discovery "Permalink")Time-based Offset Discovery

         ```
use RdKafka\KafkaConsumer;

$consumer = \Junges\Kafka\Facades\Kafka::consumer(['transactions'], 'payment-processor')
    ->assignPartitionsWithOffsets(function ($partitions) {
        $partitionsWithOffsets = [];

        // Target timestamp (e.g., start of today)
        $targetTimestamp = strtotime('today') * 1000;

        foreach ($partitions as $partition) {
            // You would typically use the low-level consumer to find offsets by timestamp
            // This is a simplified example
            $partition->setOffset(RD_KAFKA_OFFSET_BEGINNING);
            $partitionsWithOffsets[] = $partition;
        }

        return $partitionsWithOffsets;
    })
    ->withHandler(function ($message) {
        processTransaction($message);
    });
```

### [](#content-partition-specific-processing "Permalink")Partition-Specific Processing

         ```
$consumer = \Junges\Kafka\Facades\Kafka::consumer(['orders'], 'order-processor')
    ->withPartitionAssignmentCallback(function ($partitions) {
        // Initialize partition-specific resources
        foreach ($partitions as $partition) {
            $partitionId = $partition->getPartition();

            // Each partition might handle different regions
            initializeRegionProcessor($partitionId);

            Log::info("Initialized processor for region partition {$partitionId}");
        }
    })
    ->withHandler(function ($message) {
        $partitionId = $message->getPartition();
        processOrderForRegion($message, $partitionId);
    });
```

[](#content-combining-with-manual-assignment "Permalink")Combining with Manual Assignment
-----------------------------------------------------------------------------------------

You can also combine dynamic discovery with manual partition assignment:

         ```
$consumer = \Junges\Kafka\Facades\Kafka::consumer(['my-topic'], 'my-group')
    ->assignPartitions([
        new \RdKafka\TopicPartition('my-topic', 0, 100),  // Start from offset 100
        new \RdKafka\TopicPartition('my-topic', 1, RD_KAFKA_OFFSET_END),  // Start from end
    ])
    ->withHandler(function ($message) {
        // Handle message
    });

// Later, you can still get the current assignment
$consumer = $consumer->build();
$partitions = $consumer->getAssignedPartitions();
```

[](#content-important-notes "Permalink")Important Notes
-------------------------------------------------------

1. **Timing**: Partition assignments happen during consumer group rebalancing, which occurs when consumers join or leave the group.
2. **Consumer Groups**: If you're using consumer groups, partition assignments are managed by Kafka's partition assignment strategy. Manual assignments override consumer group behavior.
3. **Rebalancing**: When using `withPartitionAssignmentCallback()` or `assignPartitionsWithOffsets()`, your callbacks will be called every time a rebalance occurs.
4. **Error Handling**: Always handle potential errors in your callbacks, as exceptions can disrupt the rebalancing process.
5. **Performance**: Partition assignment callbacks should be fast, as they block the rebalancing process.

This partition discovery functionality solves the common problem of needing to know partition numbers before consumption starts, making it much easier to implement features like offset management, partition-aware processing, and resumable consumers.

 Previous  [ Consumer groups    ](https://laravelkafka.com/docs/v2.9/consuming-messages/consumer-groups)

 Next  [ Message handlers    ](https://laravelkafka.com/docs/v2.9/consuming-messages/message-handlers)

Sponsors

 [ version="1.0" encoding="UTF-8"?       EasyCal ](https://easycal.app/)

 [       Search  ⌘ K   ](https://typesense.org/)
