Harness the power of Kafka in your Laravel projects

Use Kafka Producers and Consumers in your laravel app with ease!

MyKafkaConsumer.php
composer.json
$consumer = Kafka::consumer(["my-topic"])
    ->withBrokers("localhost:8092")
    ->withAutoCommit()
    ->withHandler(new MessageHandler)
    ->build();
    
$consumer->consume();
Setting up a Kafka Cluster with SSL/TLS: A Step-by-Step Guide

When it comes to working with Apache Kafka, security is one of the foremost considerations, especially if you're dealing with sensitive data or deploying in production environments. Using SSL/TLS encryption is a common and highly effective way to secure communication between Kafka clients and brokers. In this blog post, I'll walk through the process of setting up a Kafka cluster with SSL encryption.

Support Laravel Kafka by sponsoring me!

Do you find Laravel Kafka valuable and wanna support its development?

Laravel Kafka is free and Open Source software, built to empower developers like you. Your support helps maintain and enhance the project. If you find it valuable, please consider sponsoring me on GitHub. Every contribution makes a difference and keeps the development going strong! Thank you!

Want to hide this message? Sponsor at any tier of $10/month or more!

By default, Kafka doesn't encrypt network traffic, which means that data sent between producers, brokers, and consumers is vulnerable to interception. SSL/TLS encryption helps you ensure that:

  1. Data in transit is encrypted, so it cannot be read by anyone who intercepts it.
  2. Clients and brokers are authenticated, preventing unauthorized actors from gaining access to the cluster.

Let’s get started with the setup!

First, we need to create a certificate authority (CA) key and certificate. The CA key is used to sign the certificates for the Kafka brokers and clients.

Keep your cluster secured!

We are using self-signed certificates for this tutorial. In a production environment, you should use certificates signed by a trusted certificate authority (CA).

We'll be using 3 Kafka brokers in this setup, so we'll create certificates for each of them. I'll detail the process for creating certificates for the first broker, and you can replicate the steps for the other ones.

This is the directory structure we'll be using:

kafka-ssl-cluster/
├─ secrets-1/
│  ├─ kafka-1.cnf
├─ secrets-2/
│  ├─ kafka-2.cnf
├─ secrets-3/
│  ├─ kafka-3.cnf
├─ docker-compose.yml

You can generate the CA key and certificate using the following command:

openssl req -new -nodes \
   -x509 \
   -days 365 \
   -newkey rsa:2048 \
   -keyout ca.key \
   -out ca.crt \
   -config ca.cnf

This command creates a key and certificate that is valid for 365 days. The ca.cnf file contains the configuration for the certificate authority. You can create this file with the following content:

[ policy_match ]
countryName = match
stateOrProvinceName = match
organizationName = match
organizationalUnitName = optional
commonName = supplied
emailAddress = optional

[ req ]
prompt = no
distinguished_name = dn
default_md = sha256
default_bits = 4096
x509_extensions = v3_ca

[ dn ]
countryName = US
organizationName = Confluent
localityName = MountainView
commonName = confluent-ca

[ v3_ca ]
subjectKeyIdentifier=hash
basicConstraints = critical,CA:true
authorityKeyIdentifier=keyid:always,issuer:always
keyUsage = critical,keyCertSign,cRLSign

The next step is to convert those files into a .pem format. You can do this by running the following command:

cat ca.crt ca.key > ca.pem

Now, you are ready to generate the certificates for the Kafka brokers and clients. Create the server key and certificate using the following command:

openssl req -new \
    -newkey rsa:2048 \
    -keyout secrets-1/kafka-1.key \
    -out secrets-1/kafka-1.csr \
    -config secrets-1/kafka-1.cnf \
    -nodes

Sign the certificates with the certificate authority:

openssl x509 -req \
    -days 3650 \
    -in secrets-1/kafka-1.csr \
    -CA ca.crt \
    -CAkey ca.key \
    -CAcreateserial \
    -out secrets-1/kafka-1.crt \
    -extfile secrets-1/kafka-1.cnf \
    -extensions v3_req

The kafka-1.cnf file contains the configuration for the Kafka broker certificate. You can create this file with the following content:

[req]
prompt = no
distinguished_name = dn
default_md = sha256
default_bits = 4096
req_extensions = v3_req

[ dn ]
countryName = US
organizationName = CONFLUENT
localityName = MountainView
commonName=kafka-1

[ v3_ca ]
subjectKeyIdentifier=hash
basicConstraints = critical,CA:true
authorityKeyIdentifier=keyid:always,issuer:always
keyUsage = critical,keyCertSign,cRLSign

[ v3_req ]
subjectKeyIdentifier = hash
basicConstraints = CA:FALSE
nsComment = "OpenSSL Generated Certificate"
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth, clientAuth
subjectAltName = @alt_names

[ alt_names ]
DNS.1=kafka-1
DNS.2=kafka-1-external
DNS.3=localhost

Stay up to date with Laravel Kafka

You can follow me on these platforms:

On all these platforms, I share programming tips, and what I have learned in this and other ongoing projects.

Rest assured that I will only use your email address to send you the announcements/updates and will not use it for any other purposes. No spam, I promise!

Now, we can convert the server certificate to a pkc12 format:

openssl pkcs12 -export \
    -in secrets-1/kafka-1.crt \
    -inkey secrets-1/kafka-1.key \
    -chain \
    -CAfile ca.pem \
    -name kafka-1 \
    -out secrets-1/kafka-1.p12 \
    -password pass:confluent

This command should give you a kafka-1.p12 file that contains the server certificate and key. After that, you must create the broker keystore and import the certificate:

keytool -importkeystore \
    -deststorepass confluent \
    -destkeystore secrets-1/kafka.kafka-1.keystore.pkcs12 \
    -srckeystore secrets-1/kafka-1.p12 \
    -deststoretype PKCS12  \
    -srcstoretype PKCS12 \
    -noprompt \
    -srcstorepass confluent

and finally, save the credentials:

echo "confluent" > secrets-$cluster/kafka-1_sslkey_creds
echo "confluent" > secrets-$cluster/kafka-1_keystore_creds

You can use this command to verify the broker keystore:

keytool -list -v \
      - keystore secrets-1/kafka-1.keystore.pkcs12 \
      -storepass confluent

You can now replicate these steps for the other brokers and clients. Make sure to update the configuration files and names accordingly.

Now that we have our keys and certificates ready, we can set up a Kafka cluster using Docker. You can use the following docker-compose.yml file to create a Kafka cluster with SSL enabled:

version: "3.5"
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    restart: always
    container_name: zookeeper-1
    hostname: zookeeper-1
    ports:
      - "12181:12181"
    volumes:
      - ./data-zk-log-1:/var/lib/zookeeper/log
      - ./data-zk-data-1:/var/lib/zookeeper/data
    networks:
      - confluent
    environment:
      - ZOOKEEPER_SERVER_ID=1
      - ZOOKEEPER_CLIENT_PORT=12181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_INIT_LIMIT=5
      - ZOOKEEPER_SYNC_LIMIT=2
      - ZOOKEEPER_SERVERS=zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888

  zookeeper-2:
    image: confluentinc/cp-zookeeper:latest
    restart: always
    container_name: zookeeper-2
    hostname: zookeeper-2
    ports:
      - "22181:22181"
    volumes:
      - ./data-zk-log-2:/var/lib/zookeeper/log
      - ./data-zk-data-2:/var/lib/zookeeper/data
    networks:
      - confluent
    environment:
      - ZOOKEEPER_SERVER_ID=2
      - ZOOKEEPER_CLIENT_PORT=22181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_INIT_LIMIT=5
      - ZOOKEEPER_SYNC_LIMIT=2
      - ZOOKEEPER_SERVERS=zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888

  zookeeper-3:
    image: confluentinc/cp-zookeeper:latest
    restart: always
    container_name: zookeeper-3
    hostname: zookeeper-3
    ports:
      - "32181:32181"
    volumes:
      - ./data-zk-log-3:/var/lib/zookeeper/log
      - ./data-zk-data-3:/var/lib/zookeeper/data
    networks:
      - confluent
    environment:
      - ZOOKEEPER_SERVER_ID=3
      - ZOOKEEPER_CLIENT_PORT=32181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_INIT_LIMIT=5
      - ZOOKEEPER_SYNC_LIMIT=2
      - ZOOKEEPER_SERVERS=zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888

  kafka-1:
    image: confluentinc/cp-kafka:latest
    restart: always
    container_name: kafka-1
    hostname: kafka-1
    ports:
      - "19092:19092"
      - "19093:19093"
    networks:
      - confluent
    volumes:
      - ./data-kafka-1:/var/lib/kafka/data
      - ./secrets-1:/etc/kafka/secrets
    environment:
      KAFKA_BROKER_ID: 101
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:22181,zookeeper-3:32181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:19092,SSL://0.0.0.0:19093,BROKER://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:19092,SSL://localhost:19093,BROKER://kafka-1:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL,BROKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
      KAFKA_SSL_KEYSTORE_FILENAME: kafka.kafka-1.keystore.pkcs12
      KAFKA_SSL_KEYSTORE_CREDENTIALS: kafka-1_keystore_creds
      KAFKA_SSL_KEY_CREDENTIALS: kafka-1_sslkey_creds
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"

  kafka-2:
    image: confluentinc/cp-kafka:latest
    restart: always
    container_name: kafka-2
    hostname: kafka-2
    ports:
      - "29092:29092"
      - "29093:29093"
    networks:
      - confluent
    volumes:
      - ./data-kafka-2:/var/lib/kafka/data
      - ./secrets-2:/etc/kafka/secrets
    environment:
      KAFKA_BROKER_ID: 102
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:22181,zookeeper-3:32181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,SSL://0.0.0.0:29093,BROKER://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,SSL://localhost:29093,BROKER://kafka-2:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL,BROKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
      KAFKA_SSL_KEYSTORE_FILENAME: kafka.kafka-2.keystore.pkcs12
      KAFKA_SSL_KEYSTORE_CREDENTIALS: kafka-2_keystore_creds
      KAFKA_SSL_KEY_CREDENTIALS: kafka-2_sslkey_creds
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"

  kafka-3:
    image: confluentinc/cp-kafka:latest
    restart: always
    container_name: kafka-3
    hostname: kafka-3
    ports:
      - "39092:39092"
      - "39093:39093"
    networks:
      - confluent
    volumes:
      - ./data-kafka-3:/var/lib/kafka/data
      - ./secrets-3:/etc/kafka/secrets
    environment:
      KAFKA_BROKER_ID: 103
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:22181,zookeeper-3:32181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092,SSL://0.0.0.0:39093,BROKER://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:39092,SSL://localhost:39093,BROKER://kafka-3:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL,BROKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
      KAFKA_SSL_KEYSTORE_FILENAME: kafka.kafka-3.keystore.pkcs12
      KAFKA_SSL_KEYSTORE_CREDENTIALS: kafka-3_keystore_creds
      KAFKA_SSL_KEY_CREDENTIALS: kafka-3_sslkey_creds
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"

volumes:
  data-zk-log-1:
  data-zk-data-1:
  data-zk-log-2:
  data-zk-data-2:
  data-zk-log-3:
  data-zk-data-3:
  data-kafka-1:
  data-kafka-2:
  data-kafka-3:

networks:
  confluent:

You can now start the Kafka cluster using the following command:

docker compose up -d

With your docker containers running, you can open a terminal session using this command:

docker compose exec -it kafka-1 bash

We'll use it to create our first kafka topic:

kafka-topics --bootstrap-server \
  localhost:19092 \ 
  --create --topic messages \
  --replica-assignment 101:102:103 

The command above will create a topic named messages with the replica assignment 101:102:103. You can now start producing and consuming messages using this Kafka cluster.

Now that we have a Kafka cluster running with SSL enabled, let's connect to it using Laravel Kafka. Here's an example of how you can create a consumer and connect to the cluster using SSL.

If you don't have Laravel Kafka installed, you can do so by following the installation docs available here .

// Within a Laravel command:
use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::consumer(brokers: 'localhost:19093')
    ->subscribe('messages')
    ->withOptions([
        'ssl.ca.location' => 'path-to/ca.crt'),
        'ssl.certificate.location' => 'path-to/ca.pem',
        'ssl.key.location' => 'path-to/ca.key',
        'ssl.key.password' => 'confluent',
        'enable.ssl.certificate.verification' => 'true',
        'security.protocol' => 'ssl',
    ])
    ->withSecurityProtocol('SSL')
    ->withHandler(function (ConsumedMessage $message, MessageConsumer $messageConsumer) {
        // Handle the consumed message
    })
    ->onStopConsuming(function () {
        $this->line('Consumer stopped.');
    })->build();

$consumer->consume();

Setting up Kafka with SSL encryption helps you secure your data and ensure that only authorized clients can connect to your brokers. While the process is not very easy, it provides robust protection for your Kafka cluster. As with any security feature, it's important to regularly review and update your certificates and keys to maintain security over time.

If you're looking to scale your Kafka deployment or implement advanced security features like mutual authentication or ACLs, this SSL setup is an essential first step.

If you like this post, consider sharing it with your friends and colleagues. If you have any questions or feedback, feel free to reach out to me on Twitter .

If you had any issues following along, you can find the complete code for this blog post on GitHub .

Stay up to date with Laravel Kafka

You can follow me on these platforms:

On all these platforms, I share programming tips, and what I have learned in this and other ongoing projects.

Rest assured that I will only use your email address to send you the announcements/updates and will not use it for any other purposes. No spam, I promise!