When it comes to working with Apache Kafka, security is one of the foremost considerations, especially if you're dealing with sensitive data or deploying in production environments. Using SSL/TLS encryption is a common and highly effective way to secure communication between Kafka clients and brokers. In this blog post, I'll walk through the process of setting up a Kafka cluster with SSL encryption.
Support Laravel Kafka by sponsoring me!
Do you find Laravel Kafka valuable and wanna support its development?
Laravel Kafka is free and Open Source software, built to empower developers like you. Your support helps maintain and enhance the project. If you find it valuable, please consider sponsoring me on GitHub. Every contribution makes a difference and keeps the development going strong! Thank you!
Why SSL for Kafka?
By default, Kafka doesn't encrypt network traffic, which means that data sent between producers, brokers, and consumers is vulnerable to interception. SSL/TLS encryption helps you ensure that:
- Data in transit is encrypted, so it cannot be read by anyone who intercepts it.
- Clients and brokers are authenticated, preventing unauthorized actors from gaining access to the cluster.
Let’s get started with the setup!
Create the certification authority key and certificate
First, we need to create a certificate authority (CA) key and certificate. The CA key is used to sign the certificates for the Kafka brokers and clients.
Keep your cluster secured!
We'll be using 3 Kafka brokers in this setup, so we'll create certificates for each of them. I'll detail the process for creating certificates for the first broker, and you can replicate the steps for the other ones.
This is the directory structure we'll be using:
kafka-ssl-cluster/ ├─ secrets-1/ │ ├─ kafka-1.cnf ├─ secrets-2/ │ ├─ kafka-2.cnf ├─ secrets-3/ │ ├─ kafka-3.cnf ├─ docker-compose.yml
You can generate the CA key and certificate using the following command:
openssl req -new -nodes \ -x509 \ -days 365 \ -newkey rsa:2048 \ -keyout ca.key \ -out ca.crt \ -config ca.cnf
This command creates a key and certificate that is valid for 365 days. The ca.cnf
file contains the configuration for the certificate authority. You can create this file with the following content:
[ policy_match ] countryName = match stateOrProvinceName = match organizationName = match organizationalUnitName = optional commonName = supplied emailAddress = optional [ req ] prompt = no distinguished_name = dn default_md = sha256 default_bits = 4096 x509_extensions = v3_ca [ dn ] countryName = US organizationName = Confluent localityName = MountainView commonName = confluent-ca [ v3_ca ] subjectKeyIdentifier=hash basicConstraints = critical,CA:true authorityKeyIdentifier=keyid:always,issuer:always keyUsage = critical,keyCertSign,cRLSign
The next step is to convert those files into a .pem
format. You can do this by running the following command:
cat ca.crt ca.key > ca.pem
Now, you are ready to generate the certificates for the Kafka brokers and clients. Create the server key and certificate using the following command:
openssl req -new \ -newkey rsa:2048 \ -keyout secrets-1/kafka-1.key \ -out secrets-1/kafka-1.csr \ -config secrets-1/kafka-1.cnf \ -nodes
Sign the certificates with the certificate authority:
openssl x509 -req \ -days 3650 \ -in secrets-1/kafka-1.csr \ -CA ca.crt \ -CAkey ca.key \ -CAcreateserial \ -out secrets-1/kafka-1.crt \ -extfile secrets-1/kafka-1.cnf \ -extensions v3_req
The kafka-1.cnf
file contains the configuration for the Kafka broker certificate. You can create this file with the following content:
[req] prompt = no distinguished_name = dn default_md = sha256 default_bits = 4096 req_extensions = v3_req [ dn ] countryName = US organizationName = CONFLUENT localityName = MountainView commonName=kafka-1 [ v3_ca ] subjectKeyIdentifier=hash basicConstraints = critical,CA:true authorityKeyIdentifier=keyid:always,issuer:always keyUsage = critical,keyCertSign,cRLSign [ v3_req ] subjectKeyIdentifier = hash basicConstraints = CA:FALSE nsComment = "OpenSSL Generated Certificate" keyUsage = critical, digitalSignature, keyEncipherment extendedKeyUsage = serverAuth, clientAuth subjectAltName = @alt_names [ alt_names ] DNS.1=kafka-1 DNS.2=kafka-1-external DNS.3=localhost
Now, we can convert the server certificate to a pkc12
format:
openssl pkcs12 -export \ -in secrets-1/kafka-1.crt \ -inkey secrets-1/kafka-1.key \ -chain \ -CAfile ca.pem \ -name kafka-1 \ -out secrets-1/kafka-1.p12 \ -password pass:confluent
This command should give you a kafka-1.p12
file that contains the server certificate and key. After that,
you must create the broker keystore and import the certificate:
keytool -importkeystore \ -deststorepass confluent \ -destkeystore secrets-1/kafka.kafka-1.keystore.pkcs12 \ -srckeystore secrets-1/kafka-1.p12 \ -deststoretype PKCS12 \ -srcstoretype PKCS12 \ -noprompt \ -srcstorepass confluent
and finally, save the credentials:
echo "confluent" > secrets-$cluster/kafka-1_sslkey_creds echo "confluent" > secrets-$cluster/kafka-1_keystore_creds
You can use this command to verify the broker keystore:
keytool -list -v \ - keystore secrets-1/kafka-1.keystore.pkcs12 \ -storepass confluent
You can now replicate these steps for the other brokers and clients. Make sure to update the configuration files and names accordingly.
Setting up a Kafka cluster using Docker
Now that we have our keys and certificates ready, we can set up a Kafka cluster using Docker.
You can use the following docker-compose.yml
file to create a Kafka cluster with SSL enabled:
version: "3.5" services: zookeeper-1: image: confluentinc/cp-zookeeper:latest restart: always container_name: zookeeper-1 hostname: zookeeper-1 ports: - "12181:12181" volumes: - ./data-zk-log-1:/var/lib/zookeeper/log - ./data-zk-data-1:/var/lib/zookeeper/data networks: - confluent environment: - ZOOKEEPER_SERVER_ID=1 - ZOOKEEPER_CLIENT_PORT=12181 - ZOOKEEPER_TICK_TIME=2000 - ZOOKEEPER_INIT_LIMIT=5 - ZOOKEEPER_SYNC_LIMIT=2 - ZOOKEEPER_SERVERS=zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 zookeeper-2: image: confluentinc/cp-zookeeper:latest restart: always container_name: zookeeper-2 hostname: zookeeper-2 ports: - "22181:22181" volumes: - ./data-zk-log-2:/var/lib/zookeeper/log - ./data-zk-data-2:/var/lib/zookeeper/data networks: - confluent environment: - ZOOKEEPER_SERVER_ID=2 - ZOOKEEPER_CLIENT_PORT=22181 - ZOOKEEPER_TICK_TIME=2000 - ZOOKEEPER_INIT_LIMIT=5 - ZOOKEEPER_SYNC_LIMIT=2 - ZOOKEEPER_SERVERS=zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 zookeeper-3: image: confluentinc/cp-zookeeper:latest restart: always container_name: zookeeper-3 hostname: zookeeper-3 ports: - "32181:32181" volumes: - ./data-zk-log-3:/var/lib/zookeeper/log - ./data-zk-data-3:/var/lib/zookeeper/data networks: - confluent environment: - ZOOKEEPER_SERVER_ID=3 - ZOOKEEPER_CLIENT_PORT=32181 - ZOOKEEPER_TICK_TIME=2000 - ZOOKEEPER_INIT_LIMIT=5 - ZOOKEEPER_SYNC_LIMIT=2 - ZOOKEEPER_SERVERS=zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 kafka-1: image: confluentinc/cp-kafka:latest restart: always container_name: kafka-1 hostname: kafka-1 ports: - "19092:19092" - "19093:19093" networks: - confluent volumes: - ./data-kafka-1:/var/lib/kafka/data - ./secrets-1:/etc/kafka/secrets environment: KAFKA_BROKER_ID: 101 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:22181,zookeeper-3:32181 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:19092,SSL://0.0.0.0:19093,BROKER://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:19092,SSL://localhost:19093,BROKER://kafka-1:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL,BROKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: BROKER KAFKA_SSL_KEYSTORE_FILENAME: kafka.kafka-1.keystore.pkcs12 KAFKA_SSL_KEYSTORE_CREDENTIALS: kafka-1_keystore_creds KAFKA_SSL_KEY_CREDENTIALS: kafka-1_sslkey_creds KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS" kafka-2: image: confluentinc/cp-kafka:latest restart: always container_name: kafka-2 hostname: kafka-2 ports: - "29092:29092" - "29093:29093" networks: - confluent volumes: - ./data-kafka-2:/var/lib/kafka/data - ./secrets-2:/etc/kafka/secrets environment: KAFKA_BROKER_ID: 102 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:22181,zookeeper-3:32181 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,SSL://0.0.0.0:29093,BROKER://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,SSL://localhost:29093,BROKER://kafka-2:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL,BROKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: BROKER KAFKA_SSL_KEYSTORE_FILENAME: kafka.kafka-2.keystore.pkcs12 KAFKA_SSL_KEYSTORE_CREDENTIALS: kafka-2_keystore_creds KAFKA_SSL_KEY_CREDENTIALS: kafka-2_sslkey_creds KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS" kafka-3: image: confluentinc/cp-kafka:latest restart: always container_name: kafka-3 hostname: kafka-3 ports: - "39092:39092" - "39093:39093" networks: - confluent volumes: - ./data-kafka-3:/var/lib/kafka/data - ./secrets-3:/etc/kafka/secrets environment: KAFKA_BROKER_ID: 103 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:22181,zookeeper-3:32181 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092,SSL://0.0.0.0:39093,BROKER://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:39092,SSL://localhost:39093,BROKER://kafka-3:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL,BROKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: BROKER KAFKA_SSL_KEYSTORE_FILENAME: kafka.kafka-3.keystore.pkcs12 KAFKA_SSL_KEYSTORE_CREDENTIALS: kafka-3_keystore_creds KAFKA_SSL_KEY_CREDENTIALS: kafka-3_sslkey_creds KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS" volumes: data-zk-log-1: data-zk-data-1: data-zk-log-2: data-zk-data-2: data-zk-log-3: data-zk-data-3: data-kafka-1: data-kafka-2: data-kafka-3: networks: confluent:
You can now start the Kafka cluster using the following command:
docker compose up -d
With your docker containers running, you can open a terminal session using this command:
docker compose exec -it kafka-1 bash
We'll use it to create our first kafka topic:
kafka-topics --bootstrap-server \ localhost:19092 \ --create --topic messages \ --replica-assignment 101:102:103
The command above will create a topic named messages
with the replica assignment 101:102:103
. You can now start producing and consuming messages using this Kafka cluster.
Connecting to our cluster using Laravel Kafka
Now that we have a Kafka cluster running with SSL enabled, let's connect to it using Laravel Kafka. Here's an example of how you can create a consumer and connect to the cluster using SSL.
If you don't have Laravel Kafka installed, you can do so by following the installation docs available here .
// Within a Laravel command: use Junges\Kafka\Facades\Kafka; $consumer = Kafka::consumer(brokers: 'localhost:19093') ->subscribe('messages') ->withOptions([ 'ssl.ca.location' => 'path-to/ca.crt'), 'ssl.certificate.location' => 'path-to/ca.pem', 'ssl.key.location' => 'path-to/ca.key', 'ssl.key.password' => 'confluent', 'enable.ssl.certificate.verification' => 'true', 'security.protocol' => 'ssl', ]) ->withSecurityProtocol('SSL') ->withHandler(function (ConsumedMessage $message, MessageConsumer $messageConsumer) { // Handle the consumed message }) ->onStopConsuming(function () { $this->line('Consumer stopped.'); })->build(); $consumer->consume();
In closing
Setting up Kafka with SSL encryption helps you secure your data and ensure that only authorized clients can connect to your brokers. While the process is not very easy, it provides robust protection for your Kafka cluster. As with any security feature, it's important to regularly review and update your certificates and keys to maintain security over time.
If you're looking to scale your Kafka deployment or implement advanced security features like mutual authentication or ACLs, this SSL setup is an essential first step.
If you like this post, consider sharing it with your friends and colleagues. If you have any questions or feedback, feel free to reach out to me on Twitter .
If you had any issues following along, you can find the complete code for this blog post on GitHub .