Java Client
Lets create topic, publish couple of messages into the topic and then consume the messages.
Producing messages
We produce 10 messages and send them to Kafka.
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MyProducer {
private final static String TOPIC = "my-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
public static void main(String[] args) throws Exception {
new MyProducer().runProducer(10);
}
public void runProducer(int sendMessageCount) throws Exception {
try (Producer<Long, String> producer = createProducer()) {
for (long index = 0; index < sendMessageCount; index++) {
send(producer, index);
}
}
}
private void send(Producer<Long, String> producer, long index) throws InterruptedException, ExecutionException {
ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "My message " + index);
RecordMetadata metadata = producer.send(record).get();
System.out.printf("sent record(key=%s value=%s) meta(partition=%d, offset=%d)\n",
record.key(), record.value(), metadata.partition(),
metadata.offset());
}
private Producer<Long, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
}Consuming messages
This code starts new thread which reads messages that have been produced in last 1 second.
When we start MyConsumer and then produce some messages using MyProducer, we will se this in the console.
Last updated
Was this helpful?