Kafka Consumer using Akka-Core
by Ayush Vatsyayan
I faced this issue when working on a project last week. So I had to add a Kafka consumer in the project in order to write an integration test case. Now kafka consumer is pretty straightforward when using akka-stream, but the project had an earlier version of Akka (2.4.8) on which akka-stream wasn’t supported.
Being pretty new to Akka, I googled to see if I can find some akka core solution of creating a kafka consumer, but nothing existed except one using akka-streams. The only other solution was using a java while loop, wherein the code continuously consume messages. This approach is a correct solution, however I cannot use it because whole project is written in scala this seemed quite an ugly solution to me.
For the solution I’ve used akka and then send the akka messages to start and stop the kafka message consumption.
Consider this java version of kafka consumer - here we are consuming kafka messages in the while loop.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100)
for (ConsumerRecord<String, String> record: records)
// consume here
}
Now to make it an akka code we have to replace this while loop with the akka messages. For this we will define two scala case objects CONSUME
and STOP
.
case object CONSUME
case object STOP
First let’s define properties for kafka consumer
import java.util.properties
val props = new Properties()
props.put("bootstrap.servers", "127.0.0.1:9909")
props.put("group.id", "test")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
Let’s define kafka consumer and the collection to hold consumed messages.
import org.apache.kafka.clients.consumer.KafkaConsumer
var consumedMsgs = Map[String, String]()
var consumer: KafkaConsumer[String, String] = null
override def preStart() {
consumer = new KafkaConsumer(props)
consumer.subscribe(List("some topic").asJava)
}
Now we will define our consumer to receive CONSUME
and STOP
messages.
override def wrappedReceive: Receive = {
case CONSUME =>
val records = consumer.poll(100)
records.asScala.foreach { x =>
consumedMsgs += (x.topic -> x.value)
}
self ! CONSUME // keep consuming messages
case STOP => // do nothing for now
}
Now to mimic while loop in scala we are sending CONSUME
to self. This will be trigerred as soon as client code sends CONSUME
.
We are receiving messages now, but we need to stop consuming messages when client wants. For this client will send the STOP
message and we will make consumer as null.
To stop consuming messages we will also have to add a null check when consuming CONSUME
message. Here is how it looks now
override def wrappedReceive: Receive = {
case CONSUME if consumer != null => // consumer only if consumer isn't null
val records = consumer.poll(10)
records.asScala.foreach { x =>
consumedMsgs += (x.topic -> x.value)
}
self ! CONSUME // keep consuming messages
case STOP =>
if (consumer != null) {
consumer.close()
consumer = null // set consumer to null
}
}
This is a pretty straightforward way but works for a project where scala and akka are used. We can also use Akka FSM here, which I’ll cover some other time.
Subscribe via RSS