Java源码GitBub地址:
关于kafka安装步骤可见文章
在上篇文章中使用shell 命令处理了kafka的消息生产与消息消费。下面介绍Java语言对kafka的消息生产与消息消费的处理。
1.代码结构如图
2.pom.xml
4.0.0 mfz.kafka kafka-demo 1.0-SNAPSHOT 0.10.2.0 org.apache.kafka kafka-clients ${kafka.version} org.apache.kafka kafka_2.11 ${kafka.version} maven-assembly-plugin 2.3 dist true jar-with-dependencies make-assembly package single
2 . 生产者 KafKaProducerExample.java
package bigdata.kafka;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/** * @author mengfanzhu * @Package bigdata.kafka * @Description: kafka生产者 * @date 17/3/8 17:20 */public class KafkaProducerExample { public void produceMessage() { Properties props = getConfig(); Producerproducer = new KafkaProducer (props); String topic="slavetest",key,value; for (int i = 0; i < 1000; i++) { key = "key"+i; value="value"+i; System.out.println("TOPIC: slavetest;发送KEY:"+key+";Value:"+value); producer.send(new ProducerRecord (topic, key,value)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } producer.close(); } // config public Properties getConfig() { Properties props = new Properties(); props.put("bootstrap.servers", "10.211.55.3:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } public static void main(String[] args) { KafkaProducerExample example = new KafkaProducerExample(); example.produceMessage(); }}
3.消费者 KafkaConsumerExample.java
package bigdata.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.WakeupException;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;/** * @author mengfanzhu * @Package bigdata.kafka * @Description: kafka 消费者 * @date 17/3/8 17:21 */public class KafkaConsumerExample { //config public static Properties getConfig() { Properties props = new Properties(); props.put("bootstrap.servers", "10.211.55.3:9092"); props.put("group.id", "testGroup"); props.put("enable.auto.commit", "true"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } public void consumeMessage() { // launch 3 threads to consume int numConsumers = 1; final String topic = "slavetest"; final ExecutorService executor = Executors.newFixedThreadPool(numConsumers); final Listconsumers = new ArrayList (); for (int i = 0; i < numConsumers; i++) { KafkaConsumerRunner consumer = new KafkaConsumerRunner(topic); consumers.add(consumer); executor.submit(consumer); } Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for (KafkaConsumerRunner consumer : consumers) { consumer.shutdown(); } executor.shutdown(); try { executor.awaitTermination(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }); } // Thread to consume kafka data public static class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; private final String topic; public KafkaConsumerRunner(String topic) { Properties props = getConfig(); consumer = new KafkaConsumer (props); this.topic = topic; } public void handleRecord(ConsumerRecord record) { System.out.println("name: " + Thread.currentThread().getName() + " ; topic: " + record.topic() + " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value()); } public void run() { try { // subscribe 订阅`topic consumer.subscribe(Arrays.asList(topic)); while (!closed.get()) { //read data ConsumerRecords records = consumer.poll(10000); // Handle new records for (ConsumerRecord record : records) { handleRecord(record); } } } catch (WakeupException e) { // Ignore exception if closing e.printStackTrace(); if (!closed.get()) { throw e; } } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } } public static void main(String[] args) { KafkaConsumerExample example = new KafkaConsumerExample(); example.consumeMessage(); }}
4.运行效果
附加:代码不多,但是这样直接运行可能会导致消息消费失败,或是异常 "...Failed to send messages after 3 tries.."。原因是因为远程链接kafka时需要指定broker的地址(严格来说是所监听的网络接口,或者网卡)。
解决方案:
修改config/service.properties文件,默认是注释的。