Java Client

Lets create topic, publish couple of messages into the topic and then consume the messages.

Producing messages

We produce 10 messages and send them to Kafka.

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducer {
    private final static String TOPIC = "my-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";

    public static void main(String[] args) throws Exception {
        new MyProducer().runProducer(10);
    }

    public void runProducer(int sendMessageCount) throws Exception {
        try (Producer<Long, String> producer = createProducer()) {
            for (long index = 0; index < sendMessageCount; index++) {
                send(producer, index);
            }
        }
    }

    private void send(Producer<Long, String> producer, long index) throws InterruptedException, ExecutionException {
        ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "My message " + index);
        RecordMetadata metadata = producer.send(record).get();
        System.out.printf("sent record(key=%s value=%s) meta(partition=%d, offset=%d)\n",
                record.key(), record.value(), metadata.partition(),
                metadata.offset());
    }

    private Producer<Long, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }

}

Consuming messages

This code starts new thread which reads messages that have been produced in last 1 second.

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class MyConsumer {

    private final static String TOPIC = "my-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";

    public static void main(String[] args) {
        Runnable consumer = () -> {
            while (true) {
                new MyConsumer().run(1000);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        new Thread(consumer).start();
    }

    public void run(int timeout) {
        Consumer<Long, String> consumer = createProducer();
        ConsumerRecords<Long, String> fromKafka = consumer.poll(timeout);
        System.out.println(fromKafka.count());
        fromKafka.forEach(it -> {
//            System.out.println(it.value());
        });
        consumer.commitAsync();
        consumer.close();
    }

    private Consumer<Long, String> createProducer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton(TOPIC));
        return consumer;
    }
}

When we start MyConsumer and then produce some messages using MyProducer, we will se this in the console.

0
0
0
10
0
10
0
1
9
0

Last updated