Message handlers

Now that you have created your kafka consumer, you must create a handler for the messages this consumer receives. By default, a consumer is any callable. You can use an invokable class or a simple callback. Use the withHandler method to specify your handler:

$consumer = \Junges\Kafka\Facades\Kafka::consumer();

// Using callback:
$consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
    // Handle your message here
});

Or, using an invokable class:

class Handler
{
    public function __invoke(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
        // Handle your message here
    }
}

$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withHandler(new Handler)

The ConsumerMessage contract gives you some handy methods to get the message properties:

  • getKey(): Returns the Kafka Message Key
  • getTopicName(): Returns the topic where the message was published
  • getPartition(): Returns the kafka partition where the message was published
  • getHeaders(): Returns the kafka message headers
  • getBody(): Returns the body of the message
  • getOffset(): Returns the offset where the message was published
Previous
Consumer groups