# Java Client

Lets create topic, publish couple of messages into the topic and then consume the messages.

## Producing messages

We produce 10 messages and send them to Kafka.

```
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducer {
    private final static String TOPIC = "my-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";

    public static void main(String[] args) throws Exception {
        new MyProducer().runProducer(10);
    }

    public void runProducer(int sendMessageCount) throws Exception {
        try (Producer<Long, String> producer = createProducer()) {
            for (long index = 0; index < sendMessageCount; index++) {
                send(producer, index);
            }
        }
    }

    private void send(Producer<Long, String> producer, long index) throws InterruptedException, ExecutionException {
        ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "My message " + index);
        RecordMetadata metadata = producer.send(record).get();
        System.out.printf("sent record(key=%s value=%s) meta(partition=%d, offset=%d)\n",
                record.key(), record.value(), metadata.partition(),
                metadata.offset());
    }

    private Producer<Long, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }

}
```

## Consuming messages

This code starts new thread which reads messages that have been produced in last 1 second.

```
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class MyConsumer {

    private final static String TOPIC = "my-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";

    public static void main(String[] args) {
        Runnable consumer = () -> {
            while (true) {
                new MyConsumer().run(1000);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        new Thread(consumer).start();
    }

    public void run(int timeout) {
        Consumer<Long, String> consumer = createProducer();
        ConsumerRecords<Long, String> fromKafka = consumer.poll(timeout);
        System.out.println(fromKafka.count());
        fromKafka.forEach(it -> {
//            System.out.println(it.value());
        });
        consumer.commitAsync();
        consumer.close();
    }

    private Consumer<Long, String> createProducer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton(TOPIC));
        return consumer;
    }
}
```

When we start `MyConsumer` and then produce some messages using `MyProducer`, we will se this in the console.

```
0
0
0
10
0
10
0
1
9
0
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://ondrej-kvasnovsky-2.gitbook.io/kafka-handbook/java.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
