Handling Secrets in the Kafka Connect Configuration
In one of my previous post you would have seen that the main components are Kafka in addition to the Kafka connectors. There are 2 different types of connectors (Source and Sink). Confluent have written quite nicely and provided to us interfaces through which we can configure them. The configuration files for the source and sink are JSON. There are some standard JSON elements dictated by the type of the Kafka connectors and other parts are defined by the specific source and sink connectors.
Take for example in our CDC architecture the source and sink connectors configuration.
Creating the secrets
First, we will two simple properties file called connector-source.properties and amqp.properties,
which should look like this:
# File: /secrets/connector-source.properties
database.hostname=db
database.port=5432
database.user=services
database.password=services
database.dbname=services
database.server.name=dbanalytics# File: /secrets/amqp.properties
hostname=rabbitmq
port=5672
username=guest
password=guest
analytics.exchange.type=fanout
analytics.exchange=analytics-exchange
analytics.routing.key=analytics
- These files with these properties can be created via the
docker-compose.yml
and having a shell script as follows:
# docker-compose.yml
connect:
container_name: connect
build:
context: ./docker/connect
dockerfile: Dockerfile
ports:
- 18083:8083
volumes:
- ./docker/connect:/connect-volume
depends_on:
- zookeeper
- kafka
- db
- schema-registry
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
:
- CONNECT_SOURCE_DATABASE_HOSTNAME=db
- CONNECT_SOURCE_DATABASE_PORT=5432
- CONNECT_SOURCE_DATABASE_USER=services
- CONNECT_SOURCE_DATABASE_PASSWORD=services
- CONNECT_SOURCE_DATABASE_DBNAME=services
- CONNECT_SOURCE_DATABASE_SERVER_NAME=dbanalytics
- AMQP_HOSTNAME=rabbitmq
- AMQP_PORT=5672
- AMQP_USERNAME=guest
- AMQP_PASSWORD=guest
- AMQP_ANALYTICS_EXCHANGE=analytics-exchange
- AMQP_ANALYTICS_EXCHANGE_TYPE=fanout
- AMQP_ANALYTICS_ROUTING_KEY=analytics
- Shell script which creates the properties from these environment variables passed via
docker-compose.yml
#!/bin/bash
set -e
mkdir -p /secrets
ENV_FILE="/secrets/env"
AMQP_PROP_FILE="/secrets/amqp.properties"
CONNECT_SOURCE_PROP_FILE="/secrets/connect-source.properties"
# Create the secrets file
echo "# Environment variables (auto-generated)" > ${ENV_FILE}
echo "# Secrets file (auto-generated)" > ${AMQP_PROP_FILE}
echo "# Secrets file (auto-generated)" > ${CONNECT_SOURCE_PROP_FILE}
#
# Process all environment variables that start with 'AMQP_'
# and 'CONNECT_SOURCE_'
#
for VAR in `env`
do
echo ${VAR} >> ${ENV_FILE}
PREFIX=""
PROP_FILE=""
if [[ ${VAR} =~ ^AMQP ]]; then
PREFIX="AMQP"
PROP_FILE="${AMQP_PROP_FILE}"
elif [[ ${VAR} =~ ^CONNECT_SOURCE ]]; then
PREFIX="CONNECT_SOURCE"
PROP_FILE="${CONNECT_SOURCE_PROP_FILE}"
fi
if [[ "${PREFIX}" != "" ]]; then
prop_name=`echo "$VAR" | sed -e "s/^${PREFIX}_//g" | awk -F'=' '{print $1}' | tr '[:upper:]' '[:lower:]' | tr _ .`
prop_val=`echo "$VAR" | awk -F'=' '{$1="";print substr($0,2)}' | sed -e "s/ /=/g"`
echo "--- Setting property : $prop_name=${prop_val}"
echo "$prop_name=${prop_val}" >> ${PROP_FILE}
fi
done
Use the secrets in the configuration
- Kafka connect provides the
org.apache.kafka.common.config.provider.FileConfigProvider
to read any simple properties file. The following source and sink connector JSON configuration will explain how to use them to read the properties that we defined above. - Source debezium connector configuration for PostgreSQL DB
{
"name": "source-connector",
"config": {
"_comment": "Push WAL messages to Kafka topic using PostgresConnector",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"providers": "file",
"providers.file.class": "org.apache.kafka.common.config.provider.FileConfigProvider",
"plugin.name": "wal2json",
"schema.refresh.mode": "columns_diff_exclude_unchanged_toast",
"slot.stream.params": "include-not-null=true",
"tasks.max": "1",
"database.hostname": "${file:/secrets/connect-source.properties:database.hostname}",
"database.port": "${file:/secrets/connect-source.properties:database.port}",
"database.user": "${file:/secrets/connect-source.properties:database.user}",
"database.password": "${file:/secrets/connect-source.properties:database.password}",
"database.dbname": "${file:/secrets/connect-source.properties:database.dbname}",
"database.server.name": "${file:/secrets/connect-source.properties:database.server.name}",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.${file:/secrets/connect-source.properties:database.server.name}",
"time.precision.mode": "connect",
"schema.whitelist": "${file:/secrets/connect-source.properties:database.dbname}",
"table.whitelist": "${file:/secrets/connect-source.properties:database.dbname}.test_table, ${file:/secrets/connect-source.properties:database.dbname}.postgis_table","key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:18081",
"value.converter.schema.registry.url": "http://schema-registry:18081"
}
}
- Sink connector configuration for RabbitMQ
{
"name": "analytics-amqp-sink-connector",
"config": {
"_comment": "Stream data to AMQP broker from Kafka topic using CamelRabbitmqSinkConnector",
"connector.class": "org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSinkConnector","providers": "file",
"providers.file.class": "org.apache.kafka.common.config.provider.FileConfigProvider","tasks.max": "1","camel.sink.endpoint.hostname": "${file:/secrets/amqp.properties:hostname}",
"camel.sink.endpoint.portNumber": "${file:/secrets/amqp.properties:port}",
"camel.sink.path.exchangeName": "${file:/secrets/amqp.properties:analytics.exchange}",
"camel.sink.endpoint.exchangeType": "${file:/secrets/amqp.properties:analytics.exchange.type}",
"camel.sink.endpoint.autoDelete": "false",
"camel.sink.endpoint.routingKey": "${file:/secrets/amqp.properties:analytics.routing.key}",
"camel.sink.endpoint.allowCustomHeaders": "true",
"camel.sink.endpoint.lazyStartProducer": "true","camel.sink.endpoint.username": "${file:/secrets/amqp.properties:username}",
"camel.sink.endpoint.password": "${file:/secrets/amqp.properties:password}","_comment": "topics to consume from",
"topics.regex": "^${file:/secrets/connect-source.properties:database.server.name}.${file:/secrets/connect-source.properties:database.dbname}.(.*)","key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.debezium.converters.ByteBufferConverter",
"key.converter.schema.registry.url": "http://schema-registry:18081",
"value.converter.schema.registry.url": "http://schema-registry:18081",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"
}
}
I am just picking up here these 2 specifically to explain but in general reading secrets are agnostic of the specific connectors.
Summary
This way we can hide the secrets which might otherwise be quite easily visible in the Kafka Connector REST interface.