Stop consumer on demand

Sometimes, you may want to stop your consumer based on a given message or any other condition.

You can do it by adding a calling stopConsuming() method on the MessageConsumer instance that is passed as the second argument of your message handler:

$consumer = \Junges\Kafka\Facades\Kafka::consumer(['topic'])
    ->withConsumerGroupId('group')
    ->stopAfterLastMessage()
    ->withHandler(static function (\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
        if ($someCondition) {
            $consumer->stopConsuming();
        }
    })
    ->build();

$consumer->consume();

The onStopConsuming callback will be executed before stopping your consumer.

Previous
Stop consumer after last messages