Quarkus + kafka producer and consumer Example!
Table of contents
In this article, you are going to learn simple Kafka consumer and producer examples using annotations. This article's purpose is not to teach you quarkus or Kafka and the assumption is that you already know or are familiar with these concepts.
In your build file add the below dependency:
If you are using gradle build tool then use the below snippet or else check for maven on the maven center.
Kafka Producer:
To use Kafka Producer in your Quarkus application using Gradle, you need to include the following dependencies in your build.gradle
file:
dependencies {
implementation 'io.quarkus:quarkus-kafka-streams'
}
In your Quarkus application code, you can use the following example to send messages to a Kafka topic:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;
@Path("/kafka-producer")
public class KafkaProducerResource {
@Inject
KafkaProducer<String, String> kafkaProducer;
@POST
@Path("/send")
@Consumes(MediaType.TEXT_PLAIN)
public void sendMessage(String message) {
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", message);
kafkaProducer.send(record);
}
}
In the above example, the KafkaProducer
is injected into the KafkaProducerResource
class, and the sendMessage
method sends a message to the my-topic
Kafka topic. You can put the topic name into the property file.
Kafka Consumer
To use Kafka Consumer in your Quarkus application using Gradle, you need to include the following dependencies in your build.gradle
file:
dependencies {
implementation 'io.quarkus:quarkus-smallrye-reactive-messaging-kafka'
}
And in your Quarkus application code, you can use the following example to consume messages from a Kafka topic:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class KafkaConsumer {
@Incoming("my-topic")
public Multi<String> consume() {
return Multi.createFrom().range(0, 10)
.map(i -> "Consumed message: " + i);
}
}
In the above example, the KafkaConsumer
class is annotated with @ApplicationScoped
to make it a CDI bean. The @Incoming("my-topic")
annotation is used to specify that this class consumes messages from the my-topic
Kafka topic. The consume
method returns a Multi
emits messages that are consumed from the Kafka topic.
I hope this helps, you!!
More such articles:
https://www.youtube.com/channel/UCiTaHm1AYqMS4F4L9zyO7qA
\==========================**=========================
If this article adds any value to you then please clap and comment.
Let’s connect on Stackoverflow, LinkedIn, & Twitter.