Kafka从入门到进阶

1. Apache Kafka是一个分布式流平台

1.1 流平台有三个关键功能:

发布和订阅流记录,类似于一个消息队列或企业消息系统

以一种容错的持久方式存储记录流

在流记录生成的时候就处理它们

1.2 Kafka通常用于两大类应用:

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

构建实时流数据管道,在系统或应用程序之间可靠地获取数据

构建对数据流进行转换或输出的实时流媒体应用程序

1.3 有几个特别重要的概念:

Kafka is run as a cluster on one or more servers that can span multiple datacenters.

The Kafka cluster stores streams of records in categories called topics.

Each record consists of a key, a value, and a timestamp.

Kafka作为集群运行在一个或多个可以跨多个数据中心的服务器上

从这句话表达了三个意思:

Kafka是以集群方式运行的

集群中可以只有一台服务器,也有可能有多台服务器。也就是说,一台服务器也是一个集群,多台服务器也可以组成一个集群

这些服务器可以跨多个数据中心

Kafka集群按分类存储流记录,这个分类叫做主题

这句话表达了以下几个信息:

流记录是分类存储的,也就说记录是归类的

我们称这种分类为主题

简单地来讲,记录是按主题划分归类存储的

每个记录由一个键、一个值和一个时间戳组成

1.4 Kafka有四个核心API:

Producer API?:允许应用发布一条流记录到一个或多个主题

Consumer API?:允许应用订阅一个或多个主题,并处理流记录

Streams API?:允许应用作为一个流处理器,从一个或多个主题那里消费输入流,并将输出流输出到一个或多个输出主题,从而有效地讲输入流转换为输出流

Connector API?:允许将主题连接到已经存在的应用或者数据系统,以构建并允许可重用的生产者或消费者。例如,一个关系型数据库的连接器可能捕获到一张表的每一次变更

(画外音:我理解这四个核心API其实就是:发布、订阅、转换处理、从第三方采集数据。)

在Kafka中,客户端和服务器之间的通信是使用简单的、高性能的、与语言无关的TCP协议完成的。

2. Topics and Logs(主题和日志)

一个topic是一个分类,或者说是记录被发布的时候的一个名字(画外音:可以理解为记录要被发到哪儿去)。

在Kafka中,topic总是有多个订阅者,因此,一个topic可能有0个,1个或多个订阅该数据的消费者。

对于每个主题,Kafka集群维护一个分区日志,如下图所示:

每个分区都是一个有序的、不可变的记录序列,而且记录会不断的被追加,一条记录就是一个结构化的提交日志(a structured commit log)。

分区中的每条记录都被分配了一个连续的id号,这个id号被叫做offset(偏移量),这个偏移量唯一的标识出分区中的每条记录。(PS:如果把分区比作数据库表的话,那么偏移量就是主键)

Kafka集群持久化所有已发布的记录,无论它们有没有被消费,记录被保留的时间是可以配置的。例如,如果保留策略被设置为两天,那么在记录发布后的两天内,可以使用它,之后将其丢弃以释放空间。在对数据大小方面,Kafka的性能是高效的,恒定常量级的,因此长时间存储数据不是问题。

事实上,唯一维护在每个消费者上的元数据是消费者在日志中的位置或者叫偏移量。偏移量是由消费者控制的:通常消费者在读取记录的时候会线性的增加它的偏移量,但是,事实上,由于位置(偏移量)是由消费者控制的,所有它可以按任意它喜欢的顺序消费记录。例如:一个消费者可以重置到一个较旧的偏移量来重新处理之前已经处理过的数据,或者跳转到最近的记录并从“现在”开始消费。

这种特性意味着消费者非常廉价————他们可以来来去去的消息而不会对集群或者其它消费者造成太大影响。

日志中的分区有几个用途。首先,它们允许日志的规模超出单个服务器的大小。每个独立分区都必须与宿主的服务器相匹配,但一个主题可能有多个分区,所以它可以处理任意数量的数据。第二,它们作为并行的单位——稍后再进一步。

画外音:简单地来说,日志分区的作用有两个:一、日志的规模不再受限于单个服务器;二、分区意味着可以并行。

什么意思呢?主题建立在集群之上,每个主题维护了一个分区日志,顾名思义,日志是分区的;每个分区所在的服务器的资源(比如:CPU、内存、带宽、磁盘等)是有限的,如果不分区(可以理解为等同于只有一个)的话,必然受限于这个分区所在的服务器,那么多个分区的话就不一样了,就突破了这种限制,服务器可以随便加,分区也可以随便加。

3. Distribution(分布)

日志的分区分布在集群中的服务器上,每个服务器处理数据,并且分区请求是共享的。每个分区被复制到多个服务器上以实现容错,到底复制到多少个服务器上是可以配置的。

Each partition is replicated across a configurable number of servers for fault tolerance.

每个分区都有一个服务器充当“leader”角色,并且有0个或者多个服务器作为“followers”。leader处理对这个分区的所有读和写请求,而followers被动的从leader那里复制数据。如果leader失败,followers中的其中一个会自动变成新的leader。每个服务器充当一些分区的“leader”的同时也是其它分区的“follower”,因此在整个集群中负载是均衡的。

也就是说,每个服务器既是“leader”也是“follower”。我们知道一个主题可能有多个分区,一个分区可能在一个服务器上也可能跨多个服务器,然而这并不以为着一台服务器上只有一个分区,是可能有多个分区的。每个分区中有一个服务器充当“leader”,其余是“follower”。leader负责处理这个它作为leader所负责的分区的所有读写请求,而该分区中的follow只是被动复制leader的数据。这个有点儿像HDFS中的副本机制。例如:分区-1有服务器A和B组成,A是leader,B是follower,有请求要往分区-1中写数据的时候就由A处理,然后A把刚才写的数据同步给B,这样的话正常请求相当于A和B的数据是一样的,都有分区-1的全部数据,如果A宕机了,B成为leader,接替A继续处理对分区-1的读写请求。

需要注意的是,分区是一个虚拟的概念,是一个逻辑单元。

4. Producers(生产者)

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

生产者发布数据到它们选择的主题中。生产者负责选择将记录投递到哪个主题的哪个分区中。要做这件事情,可以简单地用循环方式以到达负载均衡,或者根据一些语义分区函数(比如:基于记录中的某些key)

5. Consumers(消费者)

消费者用一个消费者组名来标识它们自己(PS:相当于给自己贴一个标签,标签的名字是组名,以表明自己属于哪个组),并且每一条发布到主题中的记录只会投递给每个订阅的消费者组中的其中一个消费者实例。消费者实例可能是单独的进程或者在单独的机器上。

如果所有的消费者实例都使用相同的消费者组,那么记录将会在这些消费者之间有效的负载均衡。

如果所有的消费者实例都使用不同的消费者组,那么每条记录将会广播给所有的消费者进程。

上图中其实那个Kafka Cluster换成Topic会更准确一些

一个Kafka集群有2个服务器,4个分区(P0-P3),有两个消费者组。组A中有2个消费者实例,组B中有4个消费者实例。

通常我们会发现,主题不会有太多的消费者组,每个消费者组是一个“逻辑订阅者”(以消费者组的名义订阅主题,而非以消费者实例的名义去订阅)。每个组由许多消费者实例组成,以实现可扩展性和容错。这仍然是发布/订阅,只不过订阅者是一个消费者群体,而非单个进程。

在Kafka中,这种消费方式是通过用日志中的分区除以使用者实例来实现的,这样可以保证在任意时刻每个消费者都是排它的消费,即“公平共享”。Kafka协议动态的处理维护组中的成员。如果有心的实例加入到组中,它们将从组中的其它成员那里接管一些分区;如果组中有一个实例死了,那么它的分区将会被分给其它实例。

(画外音:什么意思呢?举个例子,在上面的图中,4个分区,组A有2个消费者,组B有4个消费者,那么对A来讲组中的每个消费者负责4/2=2个分区,对组B来说组中的每个消费者负责4/4=1个分区,而且同一时间消息只能被组中的一个实例消费。如果组中的成员数量有变化,则重新分配。)

Kafka只提供分区下的记录的总的顺序,而不提供主题下不同分区的总的顺序。每个分区结合按key划分数据的能力排序对大多数应用来说是足够的。然而,如果你需要主题下总的记录顺序,你可以只使用一个分区,这样做的做的话就意味着每个消费者组中只能有一个消费者实例。

6. 保证

在一个高级别的Kafka给出下列保证:

被一个生产者发送到指定主题分区的消息将会按照它们被发送的顺序追加到分区中。也就是说,如果记录M1和M2是被同一个生产者发送到同一个分区的,而且M1是先发送的,M2是后发送的,那么在分区中M1的偏移量一定比M2小,并且M1出现在日志中的位置更靠前。

一个消费者看到记录的顺序和它们在日志中存储的顺序是一样的。

对于一个副本因子是N的主题,我们可以容忍最多N-1个服务器失败,而不会丢失已经提交给日志的任何记录。

7. Spring Kafka

Spring提供了一个“模板”作为发送消息的高级抽象。它也通过使用@KafkaListener注释和“监听器容器”提供对消息驱动POJOs的支持。这些库促进了依赖注入和声明式的使用。

7.1 纯Java方式

1 package com.cjs.example.quickstart;

2

3 import org.apache.kafka.clients.consumer.ConsumerConfig;

4 import org.apache.kafka.clients.consumer.ConsumerRecord;

5 import org.apache.kafka.clients.producer.ProducerConfig;

6 import org.apache.kafka.common.serialization.IntegerDeserializer;

7 import org.apache.kafka.common.serialization.IntegerSerializer;

8 import org.apache.kafka.common.serialization.StringDeserializer;

9 import org.apache.kafka.common.serialization.StringSerializer;

10 import org.springframework.kafka.core.*;

11 import org.springframework.kafka.listener.KafkaMessageListenerContainer;

12 import org.springframework.kafka.listener.MessageListener;

13 import org.springframework.kafka.listener.config.ContainerProperties;

14

15 import java.util.HashMap;

16 import java.util.Map;

17

18 public class PureJavaDemo {

19

20 /**

21 * 生产者配置

22 */

23 private static Map senderProps() {

24 Map props = new HashMap<>();

25 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093");

26 props.put(ProducerConfig.RETRIES_CONFIG, 0);

27 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

28 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

29 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

30 return props;

31 }

32

33 /**

34 * 消费者配置

35 */

36 private static Map consumerProps() {

37 Map props = new HashMap<>();

38 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093");

39 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello");

40 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

41 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");

42 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

43 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

44 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

45 return props;

46 }

47

48 /**

49 * 发送模板配置

50 */

51 private static KafkaTemplate createTemplate() {

52 Map senderProps = senderProps();

53 ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(senderProps);

54 KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);

55 return kafkaTemplate;

56 }

57

58 /**

59 * 消息监听器容器配置

60 */

61 private static KafkaMessageListenerContainer createContainer() {

62 Map consumerProps = consumerProps();

63 ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);

64 ContainerProperties containerProperties = new ContainerProperties("test");

65 KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

66 return container;

67 }

68

69

70 public static void main(String[] args) throws InterruptedException {

71 String topic1 = "test"; // 主题

72

73 KafkaMessageListenerContainer container = createContainer();

74 ContainerProperties containerProperties = container.getContainerProperties();

75 containerProperties.setMessageListener(new MessageListener() {

76 @Override

77 public void onMessage(ConsumerRecord record) {

78 System.out.println("Received: " + record);

79 }

80 });

81 container.setBeanName("testAuto");

82

83 container.start();

84

85 KafkaTemplate kafkaTemplate = createTemplate();

86 kafkaTemplate.setDefaultTopic(topic1);

87

88 kafkaTemplate.sendDefault(0, "foo");

89 kafkaTemplate.sendDefault(2, "bar");

90 kafkaTemplate.sendDefault(0, "baz");

91 kafkaTemplate.sendDefault(2, "qux");

92

93 kafkaTemplate.flush();

94 container.stop();

95

96 System.out.println("结束");

97 }

98

99 }

运行结果:

Received: ConsumerRecord(topic = test, partition = 0, offset = 67, CreateTime = 1533300970788, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = foo)

Received: ConsumerRecord(topic = test, partition = 0, offset = 68, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = bar)

Received: ConsumerRecord(topic = test, partition = 0, offset = 69, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = baz)

Received: ConsumerRecord(topic = test, partition = 0, offset = 70, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = qux)

7.2 更简单一点儿,用SpringBoot

1 package com.cjs.example.quickstart;

2

3 import org.apache.kafka.clients.consumer.ConsumerRecord;

4 import org.springframework.beans.factory.annotation.Autowired;

5 import org.springframework.boot.CommandLineRunner;

6 import org.springframework.context.annotation.Bean;

7 import org.springframework.context.annotation.Configuration;

8 import org.springframework.kafka.annotation.KafkaListener;

9 import org.springframework.kafka.core.KafkaTemplate;

10

11 @Configuration

12 public class JavaConfigurationDemo {

13

14 @KafkaListener(topics = "test")

15 public void listen(ConsumerRecord record) {

16 System.out.println("收到消息: " + record);

17 }

18

19 @Bean

20 public CommandLineRunner commandLineRunner() {

21 return new MyRunner();

22 }

23

24 class MyRunner implements CommandLineRunner {

25

26 @Autowired

27 private KafkaTemplate kafkaTemplate;

28

29 @Override

30 public void run(String... args) throws Exception {

31 kafkaTemplate.send("test", "foo1");

32 kafkaTemplate.send("test", "foo2");

33 kafkaTemplate.send("test", "foo3");

34 kafkaTemplate.send("test", "foo4");

35 }

36 }

37 }

application.properties配置

spring.kafka.bootstrap-servers=192.168.101.5:9092

spring.kafka.consumer.group-id=world

8. 生产者

1 package com.cjs.example.send;

2

3 import org.apache.kafka.clients.producer.ProducerConfig;

4 import org.apache.kafka.common.serialization.IntegerSerializer;

5 import org.apache.kafka.common.serialization.StringSerializer;

6 import org.springframework.context.annotation.Bean;

7 import org.springframework.context.annotation.Configuration;

8 import org.springframework.kafka.core.DefaultKafkaProducerFactory;

9 import org.springframework.kafka.core.KafkaTemplate;

10 import org.springframework.kafka.core.ProducerFactory;

11

12 import java.util.HashMap;

13 import java.util.Map;

14

15 @Configuration

16 public class Config {

17

18 public Map producerConfigs() {

19 Map props = new HashMap<>();

20 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092");

21 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

22 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

23 return props;

24 }

25

26 public ProducerFactory producerFactory() {

27 return new DefaultKafkaProducerFactory<>(producerConfigs());

28 }

29

30 @Bean

31 public KafkaTemplate kafkaTemplate() {

32 return new KafkaTemplate(producerFactory());

33 }

34

35 }

1 package com.cjs.example.send;

2

3 import org.springframework.beans.factory.annotation.Autowired;

4 import org.springframework.boot.CommandLineRunner;

5 import org.springframework.kafka.core.KafkaTemplate;

6 import org.springframework.kafka.support.SendResult;

7 import org.springframework.stereotype.Component;

8 import org.springframework.util.concurrent.ListenableFuture;

9 import org.springframework.util.concurrent.ListenableFutureCallback;

10

11 @Component

12 public class MyCommandLineRunner implements CommandLineRunner {

13

14 @Autowired

15 private KafkaTemplate kafkaTemplate;

16

17 public void sendTo(Integer key, String value) {

18 ListenableFuture> listenableFuture = kafkaTemplate.send("test", key, value);

19 listenableFuture.addCallback(new ListenableFutureCallback>() {

20 @Override

21 public void onFailure(Throwable throwable) {

22 System.out.println("发送失败啦");

23 throwable.printStackTrace();

24 }

25

26 @Override

27 public void onSuccess(SendResult sendResult) {

28 System.out.println("发送成功," + sendResult);

29 }

30 });

31 }

32

33 @Override

34 public void run(String... args) throws Exception {

35 sendTo(1, "aaa");

36 sendTo(2, "bbb");

37 sendTo(3, "ccc");

38 }

39

40

41 }

运行结果:

发送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=aaa, timestamp=null), recordMetadata=test-0@37]

发送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=2, value=bbb, timestamp=null), recordMetadata=test-0@38]

发送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=3, value=ccc, timestamp=null), recordMetadata=test-0@39]

9. 消费者@KafkaListener

1 package com.cjs.example.receive;

2

3 import org.apache.kafka.clients.consumer.ConsumerConfig;

4 import org.apache.kafka.clients.consumer.ConsumerRecord;

5 import org.apache.kafka.common.serialization.IntegerDeserializer;

6 import org.apache.kafka.common.serialization.StringDeserializer;

7 import org.springframework.context.annotation.Bean;

8 import org.springframework.context.annotation.Configuration;

9 import org.springframework.kafka.annotation.KafkaListener;

10 import org.springframework.kafka.annotation.TopicPartition;

11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

12 import org.springframework.kafka.config.KafkaListenerContainerFactory;

13 import org.springframework.kafka.core.ConsumerFactory;

14 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

15 import org.springframework.kafka.listener.AbstractMessageListenerContainer;

16 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

17 import org.springframework.kafka.listener.config.ContainerProperties;

18 import org.springframework.kafka.support.Acknowledgment;

19 import org.springframework.kafka.support.KafkaHeaders;

20 import org.springframework.messaging.handler.annotation.Header;

21 import org.springframework.messaging.handler.annotation.Payload;

22

23 import java.util.HashMap;

24 import java.util.List;

25 import java.util.Map;

26

27 @Configuration

28 public class Config2 {

29

30 @Bean

31 public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {

32 ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

33 factory.setConsumerFactory(consumerFactory());

34 factory.setConcurrency(3);

35 ContainerProperties containerProperties = factory.getContainerProperties();

36 containerProperties.setPollTimeout(2000);

37 // containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

38 return factory;

39 }

40

41 private ConsumerFactory consumerFactory() {

42 return new DefaultKafkaConsumerFactory<>(consumerProps());

43 }

44

45 private Map consumerProps() {

46 Map props = new HashMap<>();

47 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092");

48 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hahaha");

49 // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

50 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

51 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

52 return props;

53 }

54

55

56 @KafkaListener(topics = "test")

57 public void listen(String data) {

58 System.out.println("listen 收到: " + data);

59 }

60

61

62 @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")

63 public void listen2(String data, Acknowledgment ack) {

64 System.out.println("listen2 收到: " + data);

65 ack.acknowledge();

66 }

67

68 @KafkaListener(topicPartitions = {@TopicPartition(topic = "test", partitions = "0")})

69 public void listen3(ConsumerRecord record) {

70 System.out.println("listen3 收到: " + record.value());

71 }

72

73

74 @KafkaListener(id = "xyz", topics = "test")

75 public void listen4(@Payload String foo,

76 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,

77 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,

78 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,

79 @Header(KafkaHeaders.OFFSET) List offsets) {

80 System.out.println("listen4 收到: ");

81 System.out.println(foo);

82 System.out.println(key);

83 System.out.println(partition);

84 System.out.println(topic);

85 System.out.println(offsets);

86 }

87

88 }

9.1 Committing Offsets

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

如果enable.auto.commit设置为true,那么kafka将自动提交offset。如果设置为false,则支持下列AckMode(确认模式)。

消费者poll()方法将返回一个或多个ConsumerRecords

RECORD :处理完记录以后,当监听器返回时,提交offset

BATCH :当对poll()返回的所有记录进行处理完以后,提交偏offset

TIME :当对poll()返回的所有记录进行处理完以后,只要距离上一次提交已经过了ackTime时间后就提交

COUNT :当poll()返回的所有记录都被处理时,只要从上次提交以来收到了ackCount条记录,就可以提交

COUNT_TIME :和TIME以及COUNT类似,只要这两个中有一个为true,则提交

MANUAL :消息监听器负责调用Acknowledgment.acknowledge()方法,此后和BATCH是一样的

MANUAL_IMMEDIATE :当监听器调用Acknowledgment.acknowledge()方法后立即提交

10. Spring Boot Kafka

10.1 application.properties

spring.kafka.bootstrap-servers=192.168.101.5:9092

10.2 发送消息

1 package com.cjs.example;

2

3 import org.springframework.beans.factory.annotation.Autowired;

4 import org.springframework.kafka.core.KafkaTemplate;

5 import org.springframework.web.bind.annotation.RequestMapping;

6 import org.springframework.web.bind.annotation.RestController;

7

8 import javax.annotation.Resource;

9

10 @RestController

11 @RequestMapping("/msg")

12 public class MessageController {

13

14 @Resource

15 private KafkaTemplate kafkaTemplate;

16

17 @RequestMapping("/send")

18 public String send(String topic, String key, String value) {

19 kafkaTemplate.send(topic, key, value);

20 return "ok";

21 }

22

23 }

10.3 接收消息

1 package com.cjs.example;

2

3 import org.apache.kafka.clients.consumer.ConsumerRecord;

4 import org.springframework.kafka.annotation.KafkaListener;

5 import org.springframework.kafka.annotation.KafkaListeners;

6 import org.springframework.stereotype.Component;

7

8 @Component

9 public class MessageListener {

10

11 /**

12 * 监听订单消息

13 */

14 @KafkaListener(topics = "ORDER", groupId = "OrderGroup")

15 public void listenToOrder(String data) {

16 System.out.println("收到订单消息:" + data);

17 }

18

19 /**

20 * 监听会员消息

21 */

22 @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")

23 public void listenToMember(ConsumerRecord record) {

24 System.out.println("收到会员消息:" + record);

25 }

26

27 /**

28 * 监听所有消息

29 *

30 * 任意时刻,一条消息只会发给组中的一个消费者

31 *

32 * 消费者组中的成员数量不能超过分区数,这里分区数是1,因此订阅该主题的消费者组成员不能超过1

33 */

34 // @KafkaListeners({@KafkaListener(topics = "ORDER", groupId = "OrderGroup"),

35 // @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")})

36 // public void listenToAll(String data) {

37 // System.out.println("啊啊啊");

38 // }

39

40 }

11. pom.xml

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。


xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.cjs.example

cjs-kafka-example

0.0.1-SNAPSHOT

jar

cjs-kafka-example

org.springframework.boot

spring-boot-starter-parent

2.0.4.RELEASE

UTF-8

UTF-8

1.8

org.springframework.boot

spring-boot-starter-web

org.springframework.kafka

spring-kafka

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容