Kafka Client Configuration

blog-post

So many applications leverage environment variables for configuration, especially when they are deployed within a container. With just a little bit of code, you can leverage the same behavior for your Java Kafka clients.

Introduction

The Kafka configuration convention makes it super easy-to-use environment variables for configuration. The convention of all properties are lowercase and all separation is with periods, properties can be pulled safely from the environment.

Overrides

  • loop through all environment variables, filtering out only those with a prefix, such as “KAFKA_CLIENT_”.
  • replace all “_” characters with “.” and lowercase everything. If _ ever becomes a client property, use an escape (e.g. __ -> _).
  • abstract out the System.getEnv() to allow for a means to use an alternate source for unit-testing (implementation details left as an exercise).
private static Map<String, String> environment() {
  return System.getEnv();
}

public static Map<String, String> environmentProperties(String prefix) {
  return environment().entrySet().stream()
      .filter(e -> e.getKey().startsWith(prefix))
      .map(e -> {
        String key = e.getKey().substring(prefix.length()).replace("_", ".").toLowerCase();
        return Map.entry(key, e.getValue());
      })
      .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

Now at the heart of it, that’s it; but yes, it isn’t quite that simple.

Connection (& Secrets)

Now, the handling of secrets typically requires more restrictions within an organization. Typically, I prefer to load all connection settings from a shared-secret mounted in the container. By putting all connection settings in this file, e.g. bootstrap.servers, security.protocol, sasl.mechanism, and any secrets those settings that change based where it is deployed, are part of the container initialization; not application configuration.

public static Map<String, Object> load(String propertyFile) {
  try {
    File file = new File(propertyFile);
    if (file.exists() && file.isFile()) {
      Properties properties = new Properties();
      try (InputStream is = new FileInputStream(file)){
        properties.load(is);
        return new HashMap<>(properties.entrySet().stream()
            .collect(Collectors.toMap(e->e.getKey().toString(),Map.Entry::getValue)));
      } 
    } else {
      throw new RuntimeException("unable to read property file.");
    }
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

Now conneciton information extracted from an actual property file, there are more secure ways of pulling that data into the container.

Putting it all Together

Now, when developing an application; the development team would knoe what are more ideal defaults and what settings cannot be modified (if any). Use a 4 step process of:

private Map<String, Object> properties() {

  Map<String, Object> map = new HashMap<>();
  1. Defaults
  map.putAll(Map.ofEntries(
        Map.entry(ProducerConfig.BATCH_SIZE_CONFIG, 200_000L),
        Map.entry(ProducerConfig.LINGER_MS_CONFIG, 200L),
        Map.entry(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
  ));
  1. Overrides
  map.putAll(KafkaUtil.environmentProperties("KAFKA_CLIENT_"));
  1. Connection Properties (Secrets)
  map.putAll(PropertiesUtil.load("/secrets/connection.properties"));
  1. Immutables
  map.putAll(Map.ofEntries(
        Map.entry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()),
        Map.entry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
  ));
  return map;
}

Summary

With just a little bit code, it is easy to make an easy dynamically configure your kafka clients while keeping connection and secret properties secure.