当前位置: 首页 > news >正文

快速做网站套餐网络广告投放

快速做网站套餐,网络广告投放,河北网络推广公司,一键生成微信小程序Spring Cloud StreamSrping cloud Bus的底层实现就是Spring Cloud Stream,Spring Cloud Stream的目的是用于构建基于消息驱动(或事件驱动)的微服务架构。Spring Cloud Stream本身对Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Extern…

Spring Cloud Stream

Srping cloud Bus的底层实现就是Spring Cloud Stream,Spring Cloud Stream的目的是用于构建基于消息驱动(或事件驱动)的微服务架构。Spring Cloud Stream本身对Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模块进行封装(整合)和扩展,下面我们实现两个服务之间的通讯来演示Spring Cloud Stream的使用方法。

整体概述

372043f11e219d5cb1f6da67718f6bf1.png

服务要想与其他服务通讯要定义通道,一般会定义输出通道和输入通道,输出通道用于发送消息,输入通道用于接收消息,每个通道都会有个名字(输入和输出只是通道类型,可以用不同的名字定义很多很多通道),不同通道的名字不能相同否则会报错(输入通道和输出通道不同类型的通道名称也不能相同),绑定器是操作RabbitMQ或Kafka的抽象层,为了屏蔽操作这些消息中间件的复杂性和不一致性,绑定器会用通道的名字在消息中间件中定义主题,一个主题内的消息生产者来自多个服务,一个主题内消息的消费者也是多个服务,也就是说消息的发布和消费是通过主题进行定义和组织的,通道的名字就是主题的名字,在RabbitMQ中主题使用Exchanges实现,在Kafka中主题使用Topic实现。

准备环境

创建两个项目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我们用Spring Cloud Stream实现通讯,spring-cloud-stream-b我们用Spring Cloud Stream的底层模块Spring Integration实现通讯。

两个项目的POM文件依赖都是:

org.springframework.cloud

spring-cloud-stream

org.springframework.cloud

spring-cloud-stream-binder-rabbit

org.springframework.boot

spring-boot-starter-test

test

org.springframework.cloud

spring-cloud-stream-test-support

test

spring-cloud-stream-binder-rabbit是指绑定器的实现使用RabbitMQ。

项目配置内容application.properties:

spring.application.name=spring-cloud-stream-a

server.port=9010

#设置默认绑定器

spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

spring.application.name=spring-cloud-stream-b

server.port=9011

#设置默认绑定器

spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

启动一个rabbitmq:

docker pull rabbitmq:3-management

docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

编写A项目代码

在A项目中定义一个输入通道一个输出通道,定义通道在接口中使用@Input和@Output注解定义,程序启动的时候Spring Cloud Stream会根据接口定义将实现类自动注入(Spring Cloud Stream自动实现该接口不需要写代码)。

A服务输入通道,通道名称ChatExchanges.A.Input,接口定义输入通道必须返回SubscribableChannel:

public interface ChatInput {

String INPUT = "ChatExchanges.A.Input";

@Input(ChatInput.INPUT)

SubscribableChannel input();

}

A服务输出通道,通道名称ChatExchanges.A.Output,输出通道必须返回MessageChannel:

public interface ChatOutput {

String OUTPUT = "ChatExchanges.A.Output";

@Output(ChatOutput.OUTPUT)

MessageChannel output();

}

定义消息实体类:

public class ChatMessage implements Serializable {

private String name;

private String message;

private Date chatDate;

//没有无参数的构造函数并行化会出错

private ChatMessage(){}

public ChatMessage(String name,String message,Date chatDate){

this.name = name;

this.message = message;

this.chatDate = chatDate;

}

public String getName(){

return this.name;

}

public String getMessage(){

return this.message;

}

public Date getChatDate() { return this.chatDate; }

public String ShowMessage(){

return String.format("聊天消息:%s的时候,%s说%s。",this.chatDate,this.name,this.message);

}

}

在业务处理类上用@EnableBinding注解绑定输入通道和输出通道,这个绑定动作其实就是创建并注册输入和输出通道的实现类到Bean中,所以可以直接是使用@Autowired进行注入使用,另外消息的串行化默认使用application/json格式(com.fastexml.jackson),最后用@StreamListener注解进行指定通道消息的监听:

//ChatInput.class的输入通道不在这里绑定,监听到数据会找不到AClient类的引用。

//Input和Output通道定义的名字不能一样,否则程序启动会抛异常。

@EnableBinding({ChatOutput.class,ChatInput.class})

public class AClient {

private static Logger logger = LoggerFactory.getLogger(AClient.class);

@Autowired

private ChatOutput chatOutput;

//StreamListener自带了Json转对象的能力,收到B的消息打印并回复B一个新的消息。

@StreamListener(ChatInput.INPUT)

public void PrintInput(ChatMessage message) {

logger.info(message.ShowMessage());

ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());

chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());

}

}

到此A项目代码编写完成。

编写B项目代码

B项目使用Spring Integration实现消息的发布和消费,定义通道时我们要交换输入通道和输出通道的名称:

public interface ChatProcessor {

String OUTPUT = "ChatExchanges.A.Input";

String INPUT = "ChatExchanges.A.Output";

@Input(ChatProcessor.INPUT)

SubscribableChannel input();

@Output(ChatProcessor.OUTPUT)

MessageChannel output();

}

消息实体类:

public class ChatMessage {

private String name;

private String message;

private Date chatDate;

//没有无参数的构造函数并行化会出错

private ChatMessage(){}

public ChatMessage(String name,String message,Date chatDate){

this.name = name;

this.message = message;

this.chatDate = chatDate;

}

public String getName(){

return this.name;

}

public String getMessage(){

return this.message;

}

public Date getChatDate() { return this.chatDate; }

public String ShowMessage(){

return String.format("聊天消息:%s的时候,%s说%s。",this.chatDate,this.name,this.message);

}

}

业务处理类用@ServiceActivator注解代替@StreamListener,用@InboundChannelAdapter注解发布消息:

@EnableBinding(ChatProcessor.class)

public class BClient {

private static Logger logger = LoggerFactory.getLogger(BClient.class);

//@ServiceActivator没有Json转对象的能力需要借助@Transformer注解

@ServiceActivator(inputChannel=ChatProcessor.INPUT)

public void PrintInput(ChatMessage message) {

logger.info(message.ShowMessage());

}

@Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)

public ChatMessage transform(String message) throws Exception{

ObjectMapper objectMapper = new ObjectMapper();

return objectMapper.readValue(message,ChatMessage.class);

}

//每秒发出一个消息给A

@Bean

@InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))

public GenericMessage SendChatMessage(){

ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());

GenericMessage gm = new GenericMessage<>(message);

return gm;

}

}

运行程序

启动A项目和B项目:

842225f504011a2726b3791f4d30f13f.png

47150ecf747e8cd13c660e8daa7ba9ac.png

源码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

http://www.lbrq.cn/news/2739511.html

相关文章:

  • 室内设计效果图的网站山西网页制作
  • python web 网站开发沈阳seo关键词排名
  • 个人备案做视频网站百度指数里的资讯指数是什么
  • 网站建设常用的方法百度软件市场
  • 泊头哪里建网站呢zac seo博客
  • 没事网站建设项目规划书优化方案英语
  • 怎么改版网站网络整合营销方案ppt
  • 唯品会 一家专门做特卖的网站手机版太原百度关键词优化
  • 广州番禺桥南做网站江西seo
  • 高端互联网网站招工 最新招聘信息
  • 做一个公司动态网站北京线上教学
  • ppt做的模板下载网站有哪些网络公司排行榜
  • 做网站业务员应该了解什么外链怎么打开
  • 厦门旅游网站建设目的营销策略都有哪些方面
  • 做网站用哪些语言如何软件网站优化公司
  • 深圳开发网站的公司哪家好互联网推广的好处
  • 商业网站开发文档哈尔滨关键词优化报价
  • 同一个域名在一个服务器做两件网站泰安seo网络公司
  • 菜鸟怎么做网站贵阳seo网站推广
  • 税务网站建设的基本要求营销软文是什么
  • 做英德红茶的网站站点搜索
  • 重庆门户网站排名保定关键词排名推广
  • 如何用dw制作网页框架北京seo百科
  • 网站信息服务费怎么做分录搭建网站的步骤和顺序
  • 深圳做网站的好公司百度官方网站下载
  • 写网站建设需求深圳市网络营销推广服务公司
  • 想建一个自己的网站北京优化靠谱的公司
  • 大学校园网站建设方案免费注册网址
  • 网上注册公司的网址优化设计答案
  • 网站开发工程师证书有用吗站长工具网站推广
  • 《AI 与数据质量的深度碰撞:颠覆传统治理模式的变革》文章提纲
  • 深度解析Java synchronized关键字及其底层实现原理
  • python---包
  • ESP32 C3 开发板使用教程 01-测试显示屏
  • Qt 关于QString和std::string数据截断的问题- 遇到\0或者0x00如何处理?
  • SpringBoot 集成Ollama 本地大模型