Quarkus + kafka producer and consumer Example!

Quarkus - Hello world example using Kotlin!

In this article, you are going to learn simple Kafka consumer and producer examples using annotations. This article's purpose is not to teach you quarkus or Kafka and the assumption is that you already know or are familiar with these concepts.

In your build file add the below dependency:

If you are using gradle build tool then use the below snippet or else check for maven on the maven center.

Kafka Producer:

To use Kafka Producer in your Quarkus application using Gradle, you need to include the following dependencies in your build.gradle file:

dependencies {
    implementation 'io.quarkus:quarkus-kafka-streams'
}

In your Quarkus application code, you can use the following example to send messages to a Kafka topic:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;

@Path("/kafka-producer")
public class KafkaProducerResource {

    @Inject
    KafkaProducer<String, String> kafkaProducer;

    @POST
    @Path("/send")
    @Consumes(MediaType.TEXT_PLAIN)
    public void sendMessage(String message) {
        ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", message);
        kafkaProducer.send(record);
    }
}

In the above example, the KafkaProducer is injected into the KafkaProducerResource class, and the sendMessage method sends a message to the my-topic Kafka topic. You can put the topic name into the property file.

Kafka Consumer

To use Kafka Consumer in your Quarkus application using Gradle, you need to include the following dependencies in your build.gradle file:

dependencies {
    implementation 'io.quarkus:quarkus-smallrye-reactive-messaging-kafka'
}

And in your Quarkus application code, you can use the following example to consume messages from a Kafka topic:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class KafkaConsumer {

    @Incoming("my-topic")
    public Multi<String> consume() {
        return Multi.createFrom().range(0, 10)
                .map(i -> "Consumed message: " + i);
    }
}

In the above example, the KafkaConsumer class is annotated with @ApplicationScoped to make it a CDI bean. The @Incoming("my-topic") annotation is used to specify that this class consumes messages from the my-topic Kafka topic. The consume method returns a Multi emits messages that are consumed from the Kafka topic.

I hope this helps, you!!

More such articles:

https://medium.com/techwasti

https://www.youtube.com/channel/UCiTaHm1AYqMS4F4L9zyO7qA

https://www.techwasti.com/

\==========================**=========================

If this article adds any value to you then please clap and comment.

Let’s connect on Stackoverflow, LinkedIn, & Twitter.

Did you find this article valuable?

Support techwasti by becoming a sponsor. Any amount is appreciated!