This series goes through conversion of some basic java kafka clients to scala - step by step. It is important to understand that it is written from my viewpoint - someone who has played with scala, likes it, but has never really had time to get into it.
You will need to have the correct initial setup - see the introduction
Basic Java clients
On the kafka course one of the first things you develop is a basic consumer and producer in java.
Producer
Let's take a look at the basic producer. There is one main java file here - BasicProducer.java
The main method does three main things:
Properties settings = new Properties();
settings.put("client.id", "basic-producer");
settings.put("bootstrap.servers", "localhost:29092");
settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
This sets up the configuration of the kafka producer that we want and says we are working with records that use String both as key and value.
KafkaProducer<String, String> producer = new KafkaProducer<>(settings);
This creates the producer with the supplied configuration.
for (int i = 1; i <= 5; i++) {
final String key = "key-" + i;
final String value = "value-" + i;
System.out.println("### Sending " + i + " ###");
final ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
producer.send(record);
}
This posts 5 messages to the topic hello-world-topic
Build and run the producer
This is a standard maven project - simply open in your favourite IDE and build.
The BasicProducer class contains a standard main method - so you should easily be able to run the code from the IDE too. You can also use the maven-exec plugin (see the readme).
The output isn't wildly exciting (we haven't configured logging so ignore related lines):
*** Starting Basic Producer ***
### Sending 1 ###
### Sending 2 ###
### Sending 3 ###
### Sending 4 ###
### Sending 5 ###
So - let's see if we can see what was added to the topic using a consumer.
Consumer
There is again one main java file here - BasicConsumer.java
The main method again does three main things:
Properties settings = new Properties();
settings.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer");
settings.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
settings.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
settings.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
settings.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
This sets up the configuration of the kafka consumer that we want and says we are working with records that use String both as key and value.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings);
This creates the consumer with the supplied configuration.
consumer.subscribe(Collections.singletonList("hello-world-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(),record.value());
}
}
This subscribes to the topic hello-world-topic
and prints out whatever it finds there.
Build and run the consumer
This is also a standard maven project - again simply open in your favourite IDE and build then run the main method in BasicConsumer or use maven directly from the command line.
The consumer output now shows the messages that the basic producer sent to the topic:
*** Starting Basic Consumer ***
offset = 0, key = key-1, value = value-1
offset = 1, key = key-2, value = value-2
offset = 2, key = key-3, value = value-3
offset = 3, key = key-4, value = value-4
offset = 4, key = key-5, value = value-5
Since the consumer listens until interrupted - break out with Ctrl-C.
Summary
So - we have now a basic consumer and producer in java. Our next step will be a basic scala variant.