springCloud --- 中级篇(3)

本系列笔记涉及到的代码在GitHub上,地址:https://github.com/zsllsz/cloud

本文涉及知识点:

  • 服务消息驱动之stream;

  • 服务链路追踪之sleuth;


欢迎大家关注我的公众号 javawebkf,目前正在慢慢地将简书文章搬到公众号,以后简书和公众号文章将同步更新,且简书上的付费文章在公众号上将免费。


一、服务消息驱动之springcloud stream

1、是什么?
现在主流的消息中间件有以下四种:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

比如京东这个网站,可能用的是RabbitMQ,但是京东的大数据分析用的是Kafka,存在两种MQ,切换、维护和开发都不太方便。消息驱动就是可以让我们不再关注MQ的实现细节,我们只需要用一种适配绑定的方式,自动地给我们在各种MQ之间切换。简言之,就像JDBC连接数据库,我们不需要关心它怎么连接各个数据库厂商,只需要按照JDBC的编程模型去写代码就好;springcloud stream也是,相当于封装了常用的各种MQ(目前stram封装了RabbitMQ和kafka),我们只需要调它的binder对象,通过它去操作MQ就行了。

2、执行流程?

  • 消息生产者:生产消息(source) ---> 通道(channel) ---> 绑定器(binder)
  • 消息消费者:绑定器(binder) ---> 通道(channel) ---> 消费消息(sink)

3、编码实现:
(1)、消息生产者:

  • 新建一名名为cloud-stream-rabbitmq-provider8801的module,作为生产者
  • pom.xml:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator </artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <scope>runtime</scope>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- eureka client -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!-- stream-rabbitmq -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  • yml:
server:
  port: 8801
  
spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置需要绑定的rabbitMQ的信息
        defaultRabbit: # 消息服务设置的名称,用于bindings整合
          type: rabbit # 消息组件类型
          environment: # rabbitmq环境配置
            spring: 
              rabbitmq:
                host: 192.168.0.106
                port: 5672
                username: admin
                password: admin
      bindings: # 服务的整合处理
        output: # 通道名
          destination: test-stream # rabbitmq的exchange的名称
          content-type: application/json
          binder: defaultRabbit # 消息服务设置的名称
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka 
  • 主启动类:
@SpringBootApplication
@EnableEurekaClient
public class StreamMain8801 {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(StreamMain8801.class, args);
    }
}
  • service:
public interface IMessageProvider {
    public String send();
}
  • serviceImpl:
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider{
    @Autowired
    private MessageChannel output;

    @Override
    public void send() {
        String msg = "hello world";
        output.send(MessageBuilder.withPayload(msg).build());
        System.out.println("============== 发送成功 ==============");
    }
}
  • controller:
@RestController
@RequestMapping("/stream")
public class SendMessageController {
    @Autowired
    private IMessageProvider messageProvider;
    
    @GetMapping("/message")
    public String send() {
        messageProvider.send();
        return "success";
    }
}

启动rabbitmq,再启动7001,然后启动8801,最后访问一下localhost:8801/stream/message,可以返回success,消息发送成功。

(2)、消息消费者:

  • 新建名为cloud-stream-rabbitmq-consumer8802,作为消费者
  • pom.xml:和8801的一样
  • yml:
server:
  port: 8802
 
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置需要绑定的rabbitMQ的信息
        defaultRabbit: # 消息服务设置的名称,用于bindings整合
          type: rabbit # 消息组件类型
          environment: # rabbitmq环境配置
            spring: 
              rabbitmq:
                host: 192.168.0.106
                port: 5672
                username: admin
                password: admin
      bindings: # 服务的整合处理
        input: # 通道名
          destination: test-stream # rabbitmq的exchange的名称
          content-type: application/json
          binder: defaultRabbit # 消息服务设置的名称
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  • 主启动:和8801一样
  • 业务类:
@EnableBinding(Sink.class)
public class ConsumerController {
    @Value("${server.port}")
    private String serverPort;
    
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("======== 端口:" + serverPort + ",消息内容:" + message.getPayload());
    }
}

这样就搞定了,启动7001,8801,8802,然后用8801往mq发消息,在8802的控制台就可以看到打印出收到的消息了。

4、重复消费的问题:
首先依照8802再建一个消费者8803。然后8803也启动起来,再通过8801发送两条消息,可以看到8802和8803控制台都打印出了消息,也就是重复消费了。

8802
8803

重复消费肯定是不行的。出现这种情况的原因是:rabbitmq默认消费者处于不同的group,在不同group中的消费者都可以消费消息。解决办法就是:将这两个消费者设置为同一group,同一group的消费者是竞争关系,能够保证消息只被其中一个消费者消费。

  • 在8802和8803的yml中都加上一行配置:
bindings: # 服务的整合处理
        input: # 通道名
          destination: test-stream # rabbitmq的exchange的名称
          content-type: application/json
          binder: defaultRabbit # 消息服务设置的名称
          group: A

即最后一行:group: A,将两个消费者组名都设置为A。这样就解决了重复消费问题,并且两个消费者轮询消费。假如8801发送了两条消息,那么8802和8803分别会消费一条消息。

5、持久化:
现在关闭8802和8803,然后用8801发4条消息。把8802的group分组去掉,8803的保留。最后启动8802和8803,会发现,8802没有收到任何消息,而8803消费了4条消息。也就是说,加上了group配置,就做了持久化,即使消费者宕机了,重启后还是可以消费到。

二、服务链路追踪之springcloud sleuth

各个微服务之间相互调用,形成了复杂的调用链路。sleuth就是来监控追踪这调用链路的,搭配zipkin使用。sleuth负责收集调用信息,zipkin负责展现。

1、zipkin的安装:

  • 下载jar包:dl.bintray.com/openzipkin/maven/io/zipkin/java/zipkin-server
  • 运行下载的jar包:
普通运行
java -jar zipkin-server-2.12.9-exec.jar
后台运行
nohup java -jar zipkin-server-2.12.9-exec.jar &
  • 控制台:ip:9411/zipkin

    zipkin

  • 修改8001和80,加上zipkin的依赖和配置:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
spring:
  application:
    name: cloud-payment-service #提交到注册中心的微服务名称
    zipkin:
      base-url: http://192.168.0.106:9411
    sleuth:
      sampler:
        probability: 1 # 采样率值介于0和1之间,1表示全部采集

中级篇完结

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352