Message handlers
Now that you have created your kafka consumer, you must create a handler for the messages this consumer receives. By default, a consumer is any callable
.
You can use an invokable class or a simple callback. Use the withHandler
method to specify your handler:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer(); // Using callback: $consumer->withHandler(function(\Junges\Kafka\Contracts\KafkaConsumerMessage $message) { // Handle your message here });
Or, using an invokable class:
class Handler { public function __invoke(\Junges\Kafka\Contracts\KafkaConsumerMessage $message){ // Handle your message here } } $consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withHandler(new Handler)
The KafkaConsumerMessage
contract gives you some handy methods to get the message properties:
getKey()
: Returns the Kafka Message KeygetTopicName()
: Returns the topic where the message was publishedgetPartition()
: Returns the kafka partition where the message was publishedgetHeaders()
: Returns the kafka message headersgetBody()
: Returns the body of the messagegetOffset()
: Returns the offset where the message was published