This series goes through conversion of some basic java kafka clients to scala - step by step. It is important to understand that it is written from my viewpoint - someone who has played with scala, likes it, but has never really had time to get into it.
In the previous step we created a basic producer and consumer in scala but it was very close to a line by line conversion. Let's try for something that is closer to normal scala - and let's get the config values out to a configuration file.
App boilerplate
First change - instead of having an object with a main method - let's actually state that it's an app.
object X {
def main(args: Array[String]): Unit = {
// Application code
}
}
changes to
object X extends App {
// Application code
}
Pure Config
The config values are present in the code - let's do something about that.
For this we'll use the PureConfig library.
Dependency
We need to add the following to build.sbt
:
"com.github.pureconfig" %% "pureconfig" % "0.12.0"
This is added to the list of libraryDependencies that is already present.
Configuration file
We can add our configuration values to a file under src/main/resources
called application.conf
.
Producer
client-id = "pureconfig-producer"
bootstrap-servers = "localhost:29092"
topic = "pureconfig-topic"
serializer = "org.apache.kafka.common.serialization.StringSerializer"
Consumer
group-id = "pureconfig-consumer"
bootstrap-servers = "localhost:29092"
topic = "pureconfig-topic"
deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
enable-auto-commit = "true"
auto-commit-interval-ms = "1000"
auto-offset-reset = "earliest"
Loading configuration
PureConfig can load the values into a matching case class.
Producer
case class Config(clientId: String,
bootstrapServers: String,
topic: String,
serializer: String)
Consumer
case class Config(groupId: String,
bootstrapServers: String,
enableAutoCommit: String,
autoCommitIntervalMs: String,
autoOffsetReset: String,
deserializer: String,
topic: String
)
Loading successful?
We can choose how to handle a config load so that we know if the configuation was loaded OK or not.
One method we can call is loadOrThrow
which will throw an exception if it can't load the configuration.
val conf = ConfigSource.default.loadOrThrow[Config]
Another option is to just use load - this returns in effect an Either - where the left choice is ConfigReaderFailures and the right choice is configuration matching the case class asked for.
ConfigSource.default.load[Config] match {
case Left(errors) => ...
case Right(config: Config) => ...
}
Config asProperties
There is one thing that means that we can't just use our nice neat Config case classes as is. The kafka clients (KafkaProducer/KafkaConsumer) require a java properties object.
For now - I've simply created an asProperties method onto each Config case class. However - there are other ways of handling this conversion (usually the word implicit
turns up here - but for this example - let's keep it simple).
Updated clients
We'll throw in some other small tidying up - this gives the following clients:
Producer
import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.producer.ProducerConfig.{BOOTSTRAP_SERVERS_CONFIG, CLIENT_ID_CONFIG, KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import pureconfig.ConfigSource
import pureconfig.generic.auto._
case class Config(clientId: String,
bootstrapServers: String,
topic: String,
serializer: String) {
def asProperties: Properties = {
val props = new Properties()
props.put(CLIENT_ID_CONFIG, clientId)
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
props.put(KEY_SERIALIZER_CLASS_CONFIG, serializer)
props.put(VALUE_SERIALIZER_CLASS_CONFIG, serializer)
props
}
}
object ConfigProducer extends App {
ConfigSource.default.load[Config] match {
case Left(errors) =>
println(errors)
System.exit(1)
case Right(config: Config) =>
println("*** Starting Config Producer ***")
val producer = new KafkaProducer[String, String](config.asProperties)
(1 to 5).foreach { i =>
producer.send(new ProducerRecord[String, String](config.topic, s"key-$i", s"value-$i"))
}
producer.close(Duration.ofMillis(100))
println("### Stopping Config Producer ###")
}
}
Consumer
import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig._
import org.apache.kafka.clients.consumer.KafkaConsumer
import pureconfig.ConfigSource
import pureconfig.generic.auto._
import scala.collection.JavaConverters._
case class Config(groupId: String,
bootstrapServers: String,
enableAutoCommit: String,
autoCommitIntervalMs: String,
autoOffsetReset: String,
deserializer: String,
topic: String
) {
def asProperties: Properties = {
val props = new Properties()
props.put(GROUP_ID_CONFIG, groupId)
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
props.put(ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit)
props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs)
props.put(AUTO_OFFSET_RESET_CONFIG, autoOffsetReset)
props.put(KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
props
}
}
object ConfigConsumer extends App {
ConfigSource.default.load[Config] match {
case Left(errors) =>
println(errors)
System.exit(1)
case Right(config: Config) =>
println("*** Starting Config Consumer ***")
val consumer = new KafkaConsumer[String, String](config.asProperties)
try {
consumer.subscribe(List(config.topic).asJava)
while (true) {
val records = consumer.poll(Duration.ofMillis(100)).asScala
for (record <- records) {
println(s"offset = ${record.offset}, key = ${record.key}, value = ${record.value}")
}
}
} finally {
consumer.close()
}
}
}
Build and Run
For each client - we can check it compiles and run it using sbt as before:
sbt compile
sbt's run command will also find classes that extend App so we can also still run:
sbt run
Producer output:
*** Starting Config Producer ***
### Stopping Config Producer ###
Consumer output:
*** Starting Config Consumer ***
offset = 0, key = key-1, value = value-1
offset = 1, key = key-2, value = value-2
offset = 2, key = key-3, value = value-3
offset = 3, key = key-4, value = value-4
offset = 4, key = key-5, value = value-5
Summary
In this step we tidied up the producer and consumer a little and moved our configuration out to a config file.