Java Client

Lets create topic, publish couple of messages into the topic and then consume the messages.

Producing messages

We produce 10 messages and send them to Kafka.

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducer {
    private final static String TOPIC = "my-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";

    public static void main(String[] args) throws Exception {
        new MyProducer().runProducer(10);
    }

    public void runProducer(int sendMessageCount) throws Exception {
        try (Producer<Long, String> producer = createProducer()) {
            for (long index = 0; index < sendMessageCount; index++) {
                send(producer, index);
            }
        }
    }

    private void send(Producer<Long, String> producer, long index) throws InterruptedException, ExecutionException {
        ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "My message " + index);
        RecordMetadata metadata = producer.send(record).get();
        System.out.printf("sent record(key=%s value=%s) meta(partition=%d, offset=%d)\n",
                record.key(), record.value(), metadata.partition(),
                metadata.offset());
    }

    private Producer<Long, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }

}

Consuming messages

This code starts new thread which reads messages that have been produced in last 1 second.

When we start MyConsumer and then produce some messages using MyProducer, we will se this in the console.

Last updated

Was this helpful?