博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
大数据系列之kafka-java实现
阅读量:4620 次
发布时间:2019-06-09

本文共 6987 字,大约阅读时间需要 23 分钟。

Java源码GitBub地址:

关于kafka安装步骤可见文章   

在上篇文章中使用shell 命令处理了kafka的消息生产与消息消费。下面介绍Java语言对kafka的消息生产与消息消费的处理。

1.代码结构如图

2.pom.xml

4.0.0
mfz.kafka
kafka-demo
1.0-SNAPSHOT
0.10.2.0
org.apache.kafka
kafka-clients
${kafka.version}
org.apache.kafka
kafka_2.11
${kafka.version}
maven-assembly-plugin
2.3
dist
true
jar-with-dependencies
make-assembly
package
single

2 . 生产者 KafKaProducerExample.java

package bigdata.kafka;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/** * @author mengfanzhu * @Package bigdata.kafka * @Description: kafka生产者 * @date 17/3/8 17:20 */public class KafkaProducerExample {    public void produceMessage()    {        Properties props = getConfig();        Producer
producer = new KafkaProducer
(props); String topic="slavetest",key,value; for (int i = 0; i < 1000; i++) { key = "key"+i; value="value"+i; System.out.println("TOPIC: slavetest;发送KEY:"+key+";Value:"+value); producer.send(new ProducerRecord
(topic, key,value)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } producer.close(); } // config public Properties getConfig() { Properties props = new Properties(); props.put("bootstrap.servers", "10.211.55.3:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } public static void main(String[] args) { KafkaProducerExample example = new KafkaProducerExample(); example.produceMessage(); }}

 

3.消费者  KafkaConsumerExample.java

package bigdata.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.WakeupException;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;/** * @author mengfanzhu * @Package bigdata.kafka * @Description: kafka 消费者 * @date 17/3/8 17:21 */public class KafkaConsumerExample {    //config    public static Properties getConfig()    {        Properties props = new Properties();        props.put("bootstrap.servers", "10.211.55.3:9092");        props.put("group.id", "testGroup");        props.put("enable.auto.commit", "true");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        return props;    }    public void consumeMessage()    {        // launch 3 threads to consume        int numConsumers = 1;        final String topic = "slavetest";        final ExecutorService executor = Executors.newFixedThreadPool(numConsumers);        final List
consumers = new ArrayList
(); for (int i = 0; i < numConsumers; i++) { KafkaConsumerRunner consumer = new KafkaConsumerRunner(topic); consumers.add(consumer); executor.submit(consumer); } Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for (KafkaConsumerRunner consumer : consumers) { consumer.shutdown(); } executor.shutdown(); try { executor.awaitTermination(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }); } // Thread to consume kafka data public static class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer
consumer; private final String topic; public KafkaConsumerRunner(String topic) { Properties props = getConfig(); consumer = new KafkaConsumer
(props); this.topic = topic; } public void handleRecord(ConsumerRecord record) { System.out.println("name: " + Thread.currentThread().getName() + " ; topic: " + record.topic() + " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value()); } public void run() { try { // subscribe 订阅`topic consumer.subscribe(Arrays.asList(topic)); while (!closed.get()) { //read data ConsumerRecords
records = consumer.poll(10000); // Handle new records for (ConsumerRecord
record : records) { handleRecord(record); } } } catch (WakeupException e) { // Ignore exception if closing e.printStackTrace(); if (!closed.get()) { throw e; } } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } } public static void main(String[] args) { KafkaConsumerExample example = new KafkaConsumerExample(); example.consumeMessage(); }}

4.运行效果

 附加:代码不多,但是这样直接运行可能会导致消息消费失败,或是异常 "...Failed to send messages after 3 tries.."。原因是因为远程链接kafka时需要指定broker的地址(严格来说是所监听的网络接口,或者网卡)。

解决方案:

  修改config/service.properties文件,默认是注释的。

 

转载于:https://www.cnblogs.com/cnmenglang/p/6522247.html

你可能感兴趣的文章
237. Delete Node in a Linked List
查看>>
【口胡】简谈福建省夏令营
查看>>
wince 位图的使用
查看>>
WCF 配置说明
查看>>
Design Patterns Addendum
查看>>
List of FTP Sever/Client Software
查看>>
IDEA 14快捷键
查看>>
浅谈SQL Server中的三种物理连接操作
查看>>
基于linux-2.6.35的class_create(),device_create解析
查看>>
docker学习笔记二
查看>>
POJ 2385 Apple Catching
查看>>
JS 之表单特殊控制
查看>>
Lua循环结构while循环、repeat 循环、for循环_学习笔记03
查看>>
[Leetcode] rotate image 旋转图片
查看>>
数据备份的OSS接口
查看>>
列表选择Spinner
查看>>
2015 eclipse-luna 中maven仓库地址更改
查看>>
Java 之变量和常量(2)
查看>>
Codeforces Round #417 B. Sagheer, the Hausmeister
查看>>
Django -- ORM
查看>>