rabbitmq+websocket(SpringBoot版)实现分布式消息推送

2019/12/05 16:46
阅读数 782

本来想用websocket做一个消息推送 可是分布式环境下不支持session共享因为服务器不同

所以采用 rabbitMQ+webSocket实现分布式消息推送

生产者将消息 发送给 rabbitMQ 的 virtual-host:/(顶极路由) 再由它路由到交换机 最终由交换机通过路由键指定具体的管道

消费者监听指定的管道获取消息

最终将获取的消息 交给 webSocket 被@OnMessage注解标识的方法

每次消费一条消息交给 被@OnMessage注解标识的方法 返回给前台 

实现分布式实时推送

1.配置rabbitMQ

消息生产者

1.1pom.xml

1 <!--引入rabbitmq依赖-->
2         <dependency>
3             <groupId>org.springframework.boot</groupId>
4             <artifactId>spring-boot-starter-amqp</artifactId>
5         </dependency>
 1 server:
 2   port: 5002
 3 
 4 spring:
 5   rabbitmq:
 6     host: localhost
 7     #账号密码 默认有的
 8     username: guest
 9     password: guest
10     #rbbitmq虚拟主机路径
11     virtual-host: /
12     #rabbitmq的端口号 也是默认的
13     port: 5672
 1 @SpringBootApplication
 2 @MapperScan(basePackages = "com.supplychain.dao")
 3 @EnableRabbit/**开启rabbitmq*/
 4 public class ThumbsupServer5002_App {
 5 
 6     public static void main(String[]args){
 7 
 8         SpringApplication.run(ThumbsupServer5002_App.class,args);
 9 
10     }
11 
12     /**消息的转换器
13      * 设置成json 并放入到Spring中
14      * */
15     @Bean
16     public MessageConverter messageConverter(){
17 
18         return new Jackson2JsonMessageConverter();
19 
20     }
21 }

测试发送消息

 1 @RunWith(SpringRunner.class)
 2 @SpringBootTest
 3 public class ThumbsupServer5002_AppTest {
 4 
 5 
 6     @Autowired
 7     private RabbitTemplate rabbitTemplate;
 8 
 9     @Test
10     public void contextLoads() {
11 
12         UserTest userTest = new UserTest("hao", "651238730@qq.com");
13 
14         /**1.指定发送的交换机
15          *      发送的消息会先发送给 virtual-host: /(顶级路由) 再由它到交换机
16          *      由交换机通过路由键指定给具体的管道
17          *
18          * 2.路由键
19          *      有的交换机需要路由键 有的不需要(发送给交换机的消息会被发送给所有管道)
20          *
21          * 3.发送的消息
22          *      如果是对象的话必须实现序列化接口因为网络传输只能传二进制
23          *
24          * */
25         rabbitTemplate.convertAndSend("userTest-exchange", "userTest-key", userTest);
26     }
27 
28 }

2.消息消费者

同样是pom.xml需要引入rabbitMQ依赖

1 <!--引入rabbitmq依赖-->
2         <dependency>
3             <groupId>org.springframework.boot</groupId>
4             <artifactId>spring-boot-starter-amqp</artifactId>
5         </dependency>

同样需要配置application.yml

 1 spring:
 2   rabbitmq:
 3     host: 127.0.0.1
 4     #账号密码 默认有的
 5     username: guest
 6     password: guest
 7     #rbbitmq虚拟主机路径
 8     virtual-host: /
 9     #rabbitmq的端口号 也是默认的
10     port: 5672
11     listener:
12       simple:
13         acknowledge-mode: manual #手动接受数据
14         #max-concurrency: 10 #最大并发
15         #prefetch: 1 #限流

同样主启动类中需要开启RabbitMQ

 1 @SpringBootApplication
 2 @EnableRabbit
 3 public class MessageServer5003_App {
 4 
 5     public static void main(String[]args){
 6 
 7         SpringApplication.run(MessageServer5003_App.class,args);
 8 
 9     }
10 
11     /**这里也需要设置消息转换类型
12      * 和发送的消息类型一定要对应
13      * 不然对象接受json启动主程序类时就会报错
14      * */
15     @Bean
16     public MessageConverter messageConverter(){
17 
18         return new Jackson2JsonMessageConverter();
19 
20     }
21 
22 }

下面到了整合的环节了

  1 @ServerEndpoint(value = "/websocket")
  2 @Component
  3 public class WebSocketServer {
  4 
  5     //静态变量 用于记录当前在线连接数 应该把它设计成线程安全
  6     private static int onlineCount=0;
  7 
  8     /**Concurrent包下的 写时复制Set 用它作于存储客户端对应的MyWebSocket对象*/
  9     private static CopyOnWriteArraySet<WebSocketServer> webSocketSet= new CopyOnWriteArraySet<WebSocketServer>();
 10 
 11 
 12     /**与某个客户端的链接会话,需要通过它来给客户端发送数据*/
 13 
 14     private Session session;
 15     /**
 16      * 参数1:Message 可以获得消息的内容字节 还可以获得消息的其他属性
 17      * 参数2:可以写确定接受的参数类型比如User
 18      * 参数3:Channel 通道
 19      *      com.rabbitmq.client.Channel必须是这个包下
 20      *      通过这个参数可以拒绝消息
 21      *      让rabbitmq再发给别的消费者
 22      *
 23      * 使用@RabbitListener 可以绑定交换机  路由键 管道
 24      *
 25      */
 26     @RabbitListener(bindings = @QueueBinding(
 27          value = @Queue(value = "userTest-queue",durable = "true"),
 28          exchange = @Exchange(name = "userTest-exchange",durable = "true",type = "direct"),
 29          key = "userTest-key"
 30         )
 31     )
 32     @RabbitHandler//注解意思:如果有消息过来 需要消费的时候才会调用该方法
 33     /**如果已知传递的参数是 UserTest对象可以通过该注解
 34      * 消息头需要用map接受
 35      * 既然是手动接受消息 就需要设置channel
 36      * */
 37     public void receiveUserMessage(@Payload UserTest userTest, @Headers Map<String,Object> headers, Channel channel) throws IOException {
 38         //sendMessage(message.toString());
 39         System.out.println("UserTest对象"+userTest);
 40         onMessage(userTest.toString());//调用消息方法将数据船体给他
 41 
 42         Long deliveryTag= (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
 43         //手动接受并告诉rabbitmq消息已经接受了  deliverTag记录接受消息 false不批量接受
 44         channel.basicAck(deliveryTag,false);
 45 
 46         /**
 47          * basicReject()
 48          * 参数1: 消息标签
 49          * 参数2: true 将消息从新放入队列  false 接受到并将消息抛弃
 50          *
 51          *
 52         try {
 53             channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
 54             System.out.println(message);
 55         } catch (IOException e) {
 56             e.printStackTrace();
 57         }
 58      */
 59 
 60     }
 61 
 62     /**服务器端推送消息*/
 63     public void sendMessage(String message){
 64         try {
 65             System.out.println("session可否显示出来"+session);
 66             this.session.getBasicRemote().sendText(message);
 67         } catch (IOException e) {
 68             e.printStackTrace();
 69         }
 70     }
 71 
 72     /**
 73      * 连接建立成功调用的方法
 74      * */
 75     @OnOpen
 76     public void onOpen(Session session){
 77         this.session=session;
 78         webSocketSet.add(this);
 79         System.out.println("有新的连接加入!当前在线人数为"+getOnlineCount());
 80         System.out.println(session);
 81     }
 82 
 83     /**
 84      * 连接关闭调用的方法
 85      * */
 86     @OnClose
 87     public void onClose(){
 88         /**从安全Set中 移除当前连接对象*/
 89         webSocketSet.remove(this);
 90         subOnlineCount();
 91         System.out.println("有一连接关闭!当前在线人数为"+getOnlineCount());
 92     }
 93 
 94 
 95 
 96     @OnMessage
 97     public void onMessage(String message){
 98 
 99         System.out.println("来自客户端的消息:"+message);
100 
101         for (WebSocketServer webSocketServer:webSocketSet){
102             webSocketServer.sendMessage(message);
103         }
104 
105     }
106 
107 
108     public static int getOnlineCount() {
109         return onlineCount;
110     }
111 
112     public static synchronized void addOnlineCount() {
113         WebSocketServer.onlineCount++;
114     }
115 
116     public static synchronized void subOnlineCount() {
117         WebSocketServer.onlineCount--;
118     }
119 
120 
121 
122 }

websocket前端

websocket是html5提出的协议属于双工通信 前端发送一次请求告诉服务器需要将http协议升级成tcp长连接

后面服务端直接给前端推送消息就可以了 从以前的一次请求一次响应 服务端被动式 变成 一次请求服务端可以无限响应

 1 <script>
 2         var socket;
 3         console.log(typeof socket)
 4         if (typeof(WebSocket)=="undefined"){
 5             alert("您的浏览器不支持WebSocket");
 6         }else{
 7             alert("您的浏览器支持WebSocket");
 8 
 9            socket=new WebSocket("ws://localhost:5003/websocket");
10 
11            socket.onopen=function () {
12                console.log("Socket 已打开");
13            };
14 
15             //获得消息事件
16             socket.onmessage = function(msg) {
17                 console.log(msg.data);
18                 //发现消息进入    调后台获取
19                 //getCallingList();
20             };
21 
22             //关闭事件
23             socket.onclose = function() {
24                 console.log("Socket已关闭");
25             };
26             //发生了错误事件
27             socket.onerror = function() {
28                 alert("Socket发生了错误");
29             };
30             /**
31             $(window).unload(function(){
32                 socket.close();
33             });
34              */
35         }
36     </script>

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部