spring websocket中 STOMP

原创
2017/04/14 15:19
阅读数 8.9K

26.4 基于WebSocket消息架构STOMP

WebSocket协议定义了两种消息类型,文本或字节,但是没定义它们的内容.它有意让客户端和服务端通过通用的子协议(例如,更高水平的协议)来定义消息语法.但是WebSocket协议的使用是可选的,客户端和服务端需要同意某些种类的协议来翻译这些消息.

26.4.1 STOMP概述

STOMP是一个简单的面向文本的消息协议,原来是为Ruby,Python,Perl等脚本语言创建的,用来连接企业消息代理器.设计它的目的是解决一部分通用的消息格式问题.STOMP可以任何可靠的双向流协议中使用,比如TCP或WebSocket.尽管STOMP是一个面向文本的协议,但是消息的内容可以是文本或字节.

STOMP 是一个基于Http框架的协议.STOMP框架的结构如下:

COMMAND
header1:value1
header2:value2

Body^@

客户端可以使用"SEND"或"SUBSCRIBE"命名来发送或订阅消息,需要一个目的地,用来描述这个消息是什么和谁来接收它.它会激活一个简单的发布订阅机制,可以用来通过代理向其他连接的客户端来发送消息或者向服务端发送消息来要求执行一些工作.

当使用spring的STOMP支持时,spring webSocket应用扮演了客户端的STOMP代理器的角色.消息被路由到@Controller里的消息处理方法,或一个简单的,内置的用来跟踪订阅或广播消息的代理器来转发给订阅者.你也可以配置一个专用的STOMP代理器(如RabbitMQ,ActiveMQ等)进行消息广播.在这种情况下,spring会保持代理的TCP连接,传递消息给它,也通过它把消息传递给连接的webSocket客户端.所以spring web应用可以依赖统一的基于http的安全,通用验证;相同的编程模式在消息处理上也起作用.

这里有一个例子,客户端订阅接收服务端定期发布的股票报价,例如定时通过SimpMessagingTemplate将消息发送到代理器;

SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*

^@

下面例子:一个客户端发送交易请求,服务端通过@NessageMapping的方法处理,处理完之后,将交易信息和细节广播给客户端.

SEND
destination:/queue/trade
content-type:application/json
content-length:44

{"action":"BUY","ticker":"MMM","shares",44}^@

目的地意味着在STOMP规则中有意遗留印记.它可以是任何符合STOMP服务器支持的目的地的语法和语义的字符串.但是,通常来说,路径字符串中"/topic/.."是发布订阅(一对多),"/queue/"意味着点到点的信息交换(一对一).

STOMP服务器可以用Message命令向所有订阅者发布消息.这里是一个服务器向一个订阅客户端发送股票交易的例子:

MESSAGE
message-id:nxahklf6-1
subscription:sub-1

destination:/topic/price.stock.MMM

{"ticker":"MMM","price":129.45}^@

你需要知道服务器不会发送未被订阅的消息.服务器端所有的消息都对应一个定义的客户端订阅,并且服务器消息的"subscription-id"头必须与客户端订阅的Id头一致的.

上面的简介是为了提供对STOMP协议最简单的理解.你可以去阅读该协议的说明书了解更多.

使用STOMP作为WebSocket子协议的好处:

  • 无需创建一个自定义消息格式
  • 可以在浏览器中使用已存在的stomp.js
  • 可以根据端点来回路消息
  • 可以使用成熟的消息代理器,如RabbitMQ,ActiveMQ等进行广播

最重要的一点是使用STOMP可以像spring-mvc提供的基于HTTP的编程模式来启用spring提供的应用级别的编程模式.

26.4.2 在WebSocket上启用STOMP

spring框架通过spring-message,spring-websocket模块提供了对基于WebSocket的STOMP协议的支持.下面有个通过在"/portfolio"暴露STOMP webSocket/SockJS端点例子,这里终点以"app"开头的消息会回路到消息处理方法(如应用业务);另外的以"/topic","/queue"开头的消息会回路到消息代理器(如,广播到其他在线客户端)

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.enableSimpleBroker("/topic", "/queue");
    }

}

在xml中的配置


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio">
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:simple-broker prefix="/topic, /queue"/>
    </websocket:message-broker>

</beans>

这个"/app"前缀是任选的,它只是用来分区那些应该被回路到消息处理方法中由应用处理的消息,和那么由代理器广播到订阅客户端的消息.

而"/topic","/queue"前缀依赖于使用中的代理器.在简单的,内置代理下这个前缀没有特定的含义,它无法区分这个终点如何使用.(发布-订阅目标有很多订阅者或一般只针对单个接受者的点对点消息).如果使用备用代理器,大多数代理器使用"/topic"作为发布/订阅语义终点的前缀,并把"/queue"作为点做点语义终点的前缀.

在浏览器里,客户端可以通过使用Stomp.js和sockjs-client连接

var socket=new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient =Stomp.over(socket);
stompClient.connect({},function(frame){
});

又或者通过WebSocket(不通过SockJS)连接

var socket=new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient=Stomp.over(socket);

stompClient.connect({},function(frame){
});

记住stompClient不需要指定login或passcode头.即使加了,客户端也会忽略或者会覆盖掉它.可以通过查看 26.4.8节,成熟代理器的链接,和26.4.10 安全,来获取更多安全信息

26.4.3 Flow of Messages

当STOMP端点被配置,spring应用会扮演链接客户端的STOMP代理器的角色.本节提供了一个大图来展示消息是如何在应用内部流动的.

spring-messaging模块听过异步消息处理的基础.它包含了大量源于spring Integration项目里的抽象,并把它们作为消息应用里的基础模块.

  • Message 一个有头和内容的消息

  • MessageHandler 消息处理的协议

  • MessageChannel 消息发送的协议,可以减少发送者和接受者之间的耦合.

  • SubScribableChannel (订阅频道)继承了消息协议,将消息发送到已注册的消息处理的订阅者们.

  • ExecutorSubscribableChannel 订阅协议的具体实现,可以通过线程池实现异步分发消息.

@EnableWebSocketMessageBroker java配置和websocket:message-broker的xml配置共同构成一个复杂消息流.下面的图表展示的是使用简单,内存代理器的情况.

输入图片说明

上面的图片包含了三个消息频道:

  • ClientInboundChannel用于接收从webSocket客户端发出的消息.
  • clientOutboundChanel 用于向WebSocket客户端发送消息
  • brokerChannel 应用内部消息的代理器

这三个频道同样在专用代理器里使用,除了"代理器替身"替代了简单代理器.

输入图片说明

"clientInboundChannel"里的消息流向注解方法给应用处理(例如股票交易执行请求)或直接发送给代理器(例如客户端订阅股票信息报价).这个STOMP目的地用于简单的前缀回路.例如,"/app"前缀的消息回路到注解方法,而"/topic","/queue"前缀的消息用于回路到代理器.

当一个消息处理注解的方法有返回值时,它的返回值会作为spring消息的内容发送到"brokerChannel".这个代理器会把这个消息广播到客户端.只要借助消息模板,你可以在应用的任何地方将消息发送到任何一个目的地.例如,一个http的POST处理方法可以广播一个消息到所有的关联客户端,或者一个服务组件可以定时广播股票价格.

下面是一个简单说明消息的例子:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic");
    }

}

@Controller
public class GreetingController {

    @MessageMapping("/greeting") {
    public String handle(String greeting) {
        return "[" + getTimestamp() + ": " + greeting;
    }

}

下面是关于上例中消息流的解释:

  • WebSocket 客户端链接WebSocket端点"/portfolio"

  • 关于"/topic/greeting"订阅通过"clientInboundChannel"传播到代理器

  • 发送到"/app/greeting"的拥抱通过"clientInboundChannel"进入并跳转到GreetingController.控制器添加了当前时间,它的返回值通过"brokerChannel"作为消息发送到"/topic/greeting"端口.(终点一般根据协议选择,但是可以通过@SendTo来重写)

  • 代理器随后把消息广播给订阅者,他们通过"clientOutboundChannel"实现传输.

下节提供注解方法更多的细节,包括参数的种类和返回值的支持.

26.4.4 注解消息处理

@MessageMapping注解可以在@Controller类的方法上使用.它既可以来用表示消息目的地的映射方法,也可以组合成类级别的@MessageMapping,用来表示控制器里的所有映射方法的共用映射.

默认的目的地映射被认为是Ant风格,斜线分割,路径格式.例如"/foo*","/foo/**".这里还包括模板变量,如"/foo/{id}",它可以被@DestinationVariable注解来引用.

也可以使用逗号作为分隔符.

@MessageMapping标志的方法支持一下方法参数:

  • Message 方法参数,用来获取要处理的完整消息

  • @Payload 用来获取消息内容的注解参数,由org.springframework.messaging.converter.MessageConverter转化.这个注解不是必须的,因为默认它是存在的.带有验证注解的负载方法的参数注解受限于JSR-303验证.

  • @Header 必要时可以使用org.springframework.messaging.converter.Converter类来访问特定的头的值

  • @Headers 该注解的方法参数必须指定为java.util.Map用来访问消息里所有的头

  • MessageHeaders 用来获取消息里所有头的Map的方法参数

  • MessageHeaderAccessor,SimpMessageHeaderAccessor,StompHeaderAccessor都是经过类型访问方法来访问头信息.

  • @DestinationVariable 用来访问消息目的地中模板参数的注解参数.他的值可以更具需要转为成申明的方法参数类型.

  • java.security.Principal 在webSocket进行http握手时,用来反射用户日志的方法参数

@MessageMapping 方法可以被org.springframework.messaging.converter.MessageConverter转化器转为有一个新的消息主体,默认,它们会当作客户端消息用相同的目的地,但使用"/topic"前缀来发送到"brokerChannel"频道.用@SendTo可以指定其他目的地.他还可以设置一个类级别的共用目的地.

返回的消息可以异步提供,通过ListenableFuture或CompletableFutrue/ComplentionStage等返回类型签名,类似springmvc相似的方法.

@SubscribeMapping注解可以将订阅请求映射到@Controller方法里.它支持方法级别,但是可以被组合为类别的@MessageMapping注解,可以让相同控制器里处理方法共享..

默认@SubscribeMapping方法的返回值会直接返回到连接的客户端,不需要通过代理器中转.这对请求回复的消息交互非常有用,例如,应用UI初始化时,可以方便的获取应用数据.另外@SubscribeMapping方法也可以被@SendTo方法注解,结果消息被被发送到"brokerChannel"用特定的目标目的地.

有时一个控制器在运行时需要用AOP代理装饰.例如,你在自己的控制器上选择添加@Transactional注解.在这种情况下,对于这些控制器,我们建议使用类级别的代理.这是控制器的典型默认选择.但是如果一个控制器实现了一个spring上下文没有回调的接口,你需要明确的配置基于类的代理.例如,tx:annotation-driven/,需要转化为<tx:annotation-driven proxy-target-class="true"/>

26.4.5 Sending messages 发送消息

你是否想从应用的任何地方都想已连接的客户端发送消息.每个应用组件都可以把消息发送到"brokerChannel"中.最简单的实现方式是进行SimpMessagingTemplate注入,并用他发送消息.一般它很容易按类型注入,例如:

@Controller
public class GreetingController{
@Autowired
private SimpMessagingTemplate template;

@RequestMapping(path="/greetings",method=POST)
public void greet(String greeting){
   String text="["+getTimestamp()+"]"+greeting;
   template.convertAndSend("/tpic/greetings",text)
}

}

但是这个等同"brokerMessagingTemplate".

26.4.6 Simple Broker 简单代理器

这个简单的内置的代理器处理客户端的订阅请求,把它们储存到内存,并根据目的地匹配来广播消息到在线客户端.这个代理器支持路径风格目的地,包括ANT风格的目的地匹配.

ant风格路径:http://blog.csdn.net/wangshfa/article/details/26471641

26.4.7 成熟代理器

简单代理器容易上手,但只支持部分STOMP命令,它基于一个简单的消息发送池,且不支持集群.相应的,应用可以通过使用成熟的消息代理器来升级.

检查STOMP的文档,选择适合的消息代理器,安装他们,并在STOMP支持下运行他们.接着用spring配置中的STOMP代理器替身来替代简单代理器. 下面的例子是如何配置一个成熟的代理器

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic", "/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }

}

xml配置如下:


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio" />
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>

</beans>

上述例子中配置的"STOMP broker relay"是一个spring的消息处理器,他将消息转发给外部的消息代理器.他会建立到代理器的TCp链接,转发所有的消息给他,并把所有从代理器收到的消息通过webSocket session转发给客户端.基本上他扮演了一个中介器的作用,向两边发送消息.

spring使用org.projecteactor:reactor-net和io.netty:netty-all来管理TCP连接,需要时可以将他们作为依赖添加.

spring4.3支持的STOMP代理器兼容2.0x的反应器,所以它不支持与spring-cloud-stream-reactive聚合,它需要3.0X的反应器.

spring5依赖于Reactor3和Reactor Netty,他们有独立的版本,支持STOMP的代理器,还对活性编程模式提供了大量支持.

另外,服务组件还是可以向代理器替身发送信息,以进行广播的

实际上,代理器替身使得消息广播更加健壮和可扩展.

26.4.8 Connections To Full-Featured Broker连接到成熟代理器

STOMP代理器中介器保持这地代理器的一个系统级别的TCP链接.这个连接用来接收服务端产生的消息,而不是接收其他消息.你可以设置该链接的凭证,如STOMP框架的登陆和密码头.这个可以在XML命令空间或java配置里通过设置SystemLogin/systemPasscode属性来设置,默认值是guest/guest.

STOMP代理器中介器也为每个连接的webSocket客户端创建了一个单独的TCP连接.你可以为所有的客户端连接配置STOMP凭证.这个可以在XML命令空间或java配置里通过设置SystemLogin/systemPasscode属性来设置,默认值是guest/guest.

STOMP代理器中介器一般代表客户端给每个跳转到的代理器的连接框架设置login和passcode头.所以WebSocket客户端不需要设置这么头,他们会被忽略.下面的部分,webSocket客户端可以依赖HTTP安全来保护WebSocket端点和创建客户端身份.

STOMP代理器中继会通过"系统"的TCP连接向消息代理器发送和接受心跳消息.你可以设置发送和接受心跳的频率(默认10秒每次).如果指向代理器的链接消失了,代理器中介会每5分钟一次,持续尝试重连,直到成功.

spring的bean可以实现为ApplicationListener<BrokerAvailabilityEvent>的接口,用来接收指向代理器的系统连接中断或重连的通知.例如,当这里没有可用的系统连接时,一个股票价格服务可以停止尝试发送消息.

STOMP代理器中介可以通过virtualHost来配置.这个属性的值可以被设置到每个连接框架的host头里,这会很有用,例如在一个云环境里,每一个TCP连接的实际地址会根据云基础STOMP服务提供host的不同而差异.

26.4.9 在@MessageMapping 目的地里使用句号分隔符

尽管斜线分割的路径格式被web开发者熟知,但在消息开发里,"."是主流,例如,在主题的名字,队列,交换者等.应用也可以通过配置自定义的AntPathMatcher在@MessageMapping映射中使用句号来代替斜线.

java配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

  // ...

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableStompBrokerRelay("/queue/", "/topic/");
    registry.setApplicationDestinationPrefixes("/app");
    registry.setPathMatcher(new AntPathMatcher("."));
  }

}

在xml配置

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:websocket="http://www.springframework.org/schema/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/websocket
    http://www.springframework.org/schema/websocket/spring-websocket.xsd">

  <websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
    <websocket:stomp-endpoint path="/stomp" />
    <websocket:simple-broker prefix="/topic, /queue"/>
  </websocket:message-broker>

  <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
    <constructor-arg index="0" value="." />
  </bean>

</beans>

下面是控制器中使用"."分隔符的简单例子

@Controller
@MessageMapping("foo")
public class FooController {

  @MessageMapping("bar.{baz}")
  public void handleBaz(@DestinationVariable String baz) {
  }

}

如果这个应用的前缀是"/app",那么这个方法可以被映射为"/app/foo.bar.{baz}";

26.4.10 Authentication 认证

每一个WebSocket的消息的STOMP session开始于一个http请求,然后它可以被升级为WebSockets,或者被回退为一系列的SockJS http传输请求.

web应用早已用认证和权限来保护HTTp请求.一般一个用户会经过spring安全的一些机制,如登陆也,http基础认证或其他来认真.已认证用户的认证的上下文被保存到Http session里,并与基于相同cookie的session相关联.

因此对于一个WebSocket握手,或者SockJS的http传输请求,一般它们已经通过HttpServletRequest#getUserPrincipal()认证为一个认证用户.spring 会自动给这些用户创建WebSocket或SockJS session,随后的STOMP消息传输通过一个用户头来进行传输.

上面对于一个典型的web应用来言没有什么特别的,它们已经为安全这么做了.用户通过http请求进行认证,并通过基于cookie的http session持有这个上下文.这样SockJS或WebSocket就会为用户创建Session,并添加一个用户头的戳,这样在应用中它们就可与进行消息传输了.

记住 STOMP协议在连接框架中有一个"login"或"passcode"头.这些开始就是为,现在也是为基于TCP的STOMP设计的.但是,spring一般会忽略STOMP协议头的认证信息,它认为这个用户已经被http传输请求认证过了,并期望WebSocket或SockJs session包含认证用户.

spring 安全提供了webSocket 子协议认证:通过一个 ChannelInterceptor基于消息中的用户头来认证消息.另外,spring session提供了WebSocket integration确保使用webSocket session时,http session不会过期.

26.4.11 基于令牌的认证

Spring Security OAuth 支持基于令牌的安全包括JSON Web Token.这个可以作为web应用的安全机制,包括基于WebSocket的STOMP交互,如上下文所述一样,通过一个基于cookie的session来保持认证.

同时基于Cookie的session并不是最好的选择,例如在应用里,他们不希望持有来自服务器端的session,在手机应用中,它们更倾向于用用户头进行安全认真.

在 webSocket protocol RFC 6455中提及,在webSocket握手期间,服务器对客户端的身份验证,不需要指定特别的方式.实际上,虽然浏览器客户端可以是用标准的认证头或cookie,但无法使用自定义头.例如SockJS js客户端无法提供一个方式来发送Http头,查看 sockjs-client issue 196. 但是它允许发送查询参数,这可以用来发送token.这个也有缺点,如,这个令牌带着URL可能无意中被服务器日志记录下来.

以上的缺点只针对基于浏览器的客户端,而不是基于java的客户端.它支持在webSocket或SockJS请求中发送头消息.

不倾向于使用cookie,但在http协议水平上没有更好的方法.倾向于使用安全头,这里有两个步骤:

    1. 在连接期间使用STOMP 客户端发送认证头
    1. 使用ChannelInterceptor处理认证头

下面的例子中服务侧配置会注册一个自定义认证拦截器.记住这个拦截器需要认证并在连接信息中设置用户头.spring会几率并存这些认证用户,在随后的相同session的STOMP消息会用到他们.

@Configuration
@EnableWebSocketMessageBroker
public class MyConfig extends AbstractWebSocketMessageBrokerConfigurer {

  @Override
  public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.setInterceptors(new ChannelInterceptorAdapter() {

        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {

            StompHeaderAccessor accessor =
                MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

            if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                Principal user = ... ; // access authentication header(s)
                accessor.setUser(user);
            }

            return message;
        }
    });
  }
}

还要记住在消息中使用spring Security的安全机制,首先你要保证你的认证的ChannelInterceptor配置优先于spring安全的.最好的做法就是在AbstatctWebSocketMessageBrokerConfigurer的子类中宣布自定义拦截的标记为@Order(Ordered.HIGHEST_PRECEDENCE + 99)

26.4.12 用户目的地

一个应用可以发送消息给特定的用户,spring的STOMP通过在目的地添加前缀"/user/"来支持它.例如,一个客户端可能订阅目的地"/user/queue/position-updates".这个目的地可以被UserDestinationMessageHandler处理,被会加上唯一的用户session,例如"/queue/position-updates-user123".它提供了通用的订阅名称目的地的便利,同时保证与其他订阅相同的目的地的用户没有冲突,所以每个用户都会收到唯一的股票位置更新.

在发送端消息可以被发送到目的地例如:"/user/{username}/queue/position-updates",这个目的地会被UserDestinationMessageHandler转译为一个或多个目的地,对应用户的每个session.这允许应用中的所有组件发送消息给一个特定的用户,只要知道这个用户名字和原始的目的地.它还支持像消息模板一样的注解.

例如,一个消息处理方法可以在@SendToUser注解的协助处理下将消息发送给用户.(它还支持类级别的共享公共目的地)

@Controller
public class PortfolioController {

    @MessageMapping("/trade")
    @SendToUser("/queue/position-updates")
    public TradeResult executeTrade(Trade trade, Principal principal) {
        // ...
        return tradeResult;
    }
}

如果用户有多个session,那么默认的所有session订阅对于特定的目的地都是可标志的.但是有时候,它必须只能指向那个发送了要处理消息的session.这个可以通过设置broadCast属性为false来实现.

@Controller
public class MyController {

    @MessageMapping("/action")
    public void handleAction() throws Exception{
        // raise MyBusinessException here
    }

    @MessageExceptionHandler
    @SendToUser(destinations="/queue/errors", broadcast=false)
    public ApplicationError handleException(MyBusinessException exception) {
        // ...
        return appError;
    }
}

虽然用户的目的地一般意味着是认证用户,但这并不严格.一个与认证用户无关的session也可以订阅用户目的地.在这种情况下,@SendToUser会默认为broadcast=false;即只面向那么发送了消息的session.

还可以通过注入SimpMessagingTemplate的bean发送用户目的地消息.一般通过java配置或XML命名空间注入.如果bean的名字是"brokeMessagingTemplate",则需要匹配@Qualifier.

@Service
public class TradeServiceImpl implements TradeService {

	private final SimpMessagingTemplate messagingTemplate;

	@Autowired
	public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
		this.messagingTemplate = messagingTemplate;
	}

	// ...

	public void afterTradeExecuted(Trade trade) {
		this.messagingTemplate.convertAndSendToUser(
				trade.getUserName(), "/queue/position-updates", trade.getResult());
	}
}

在额外消息代理器下使用用户目的地时,检查代理器的文档了解如何管理未被激活的队列.当用户的session结束,所有唯一的用户队列会被移除.例如,当像/exchange/amq.direct/position-updates目的地被使用时,RabbitMQ创建自动删除的队列.这种情况下,客户端可以订阅/user/exchange/amq.direct/position-updates.类似的,ActiveMQ也有相同的配置选项来清除未启动的目的地.

在混合应用服务器场景下,一个用户目的地可以保持未释放因为用户在关联其他服务器.这种情况下,你需要配置一个可以广播未释放的消息,这样其他服务器有机会去尝试.这个可以通过java配置里的MessageBrokerRegistry类里的userDestinationBroadcast属性配置,或者通过message-broker元素的user-destination-broadcast属性配置.

26.4.13 监听应用上下文事件和拦截消息

一些ApplicationContext事件可以被发布,可以被实现了spring的ApplicationListener接口的类接受.

  • BrokerAvailabilityEvent 说明代理器变成可用/不可用状态.当应用运行时,简单的代理器立即启用,变得可行,那么STOMP 代理器中介会断掉它和成熟代理器的链接,例如当这个成熟代理器重启时.代理器中介有重连机制,当代理器重启后会重新建立链接,所以在连到断链,以及断链到连上他们都会发布事件.使用SimpMessagingTemplate组件应该订阅这个事件,在代理器不可用时避免发送消息.在任何情况下,当发送消息是,他们需要准备处理MessageDeliverException

  • SessionConnectEvent 当一个新的STOMP连接被接收时发布,表明一个新的客户端session.这个事件包含了代表连接的session id,用户信息,所有客户端发送的自定义消息头.这对跟踪客户端session非常有用.订阅这些事件的组件可以使用SimpMessageHeaderAccessor或StompmessageHeaderAccessor包裹包含的消息.

  • SessionConnectedEvent 在SessionConnectEvent事件之后,当代理器向已经发布了一个STOMP Connection框架回应CONNECT之后,就会立即发布.此时,会认为STOMP的session已完全建立.

  • SessionSubscribeEvent 当收到一个新的STOMP订阅时发布.

  • SessionUnsubscribeEvent 当收到一个新的STOMP退订时发布.

  • SessionDisconnectEvent 当STOMP的session关闭时发布.这个关闭可能是从客户端发送的,也可能是WebSocket关闭自动产生的.在某些情况下,每个session的这个事件会发生多次.组件需要在处理混合关闭事件上做到幂等操作.

当你使用一个成熟的代理器,当代理器临时变得不可靠时,这个STOMP代理器中介会自动重连系统连接.但客户端连接不会重连.如果心跳可用,客户端一般会在10秒内发现代理器无法回应.客户端需要实现自己的重连逻辑.

另外,应用可以通过注册ChannelInterceptor在相应的消息频道上拦截进出消息.例如拦截进入的消息:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

  @Override
  public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.setInterceptors(new MyChannelInterceptor());
  }
}

一个自定义的ChannelInterceptor可以实现基类的ChannelInterceptorAdapter,并使用StompHeaderAccessor或SimpMessageHeaderAccessor来访问消息里的信息.

public class MyChannelInterceptor extends ChannelInterceptorAdapter {

  @Override
  public Message<?> preSend(Message<?> message, MessageChannel channel) {
    StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
    StompCommand command = accessor.getStompCommand();
    // ...
    return message;
  }
}

26.4.14 STOMP客户端

spring提供了一个基于WebSocket的STOMP客户端和基于TCP客户端的STOMP.

开始创建和配置WebSocketStompClient

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在上面的例子StandardWebSocketClient可以被SockJsClient替代,因为它也是WebSocketClient的一个实现.这个SockJsClient可以使用WebSocket或http-based传输作为回退.更多的细节查看26.3.7.

下一步是建立一个连接并提供了一个STOMP session的处理器.

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

当这个session可以使用时,这个handler会被通知.

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        // ...
    }
}

当session建立之后,它的内容就会发送,并通过已配置的MessageConverter序列化.

session.send("/topic/foo", "payload");

你可以订阅目的地.这个订阅方法要求一个关于订阅信息的处理器并返回可以用来退订的订阅处理.对于每一个收到的信息这个处理器可以指定每个可以被序列化的负载内容的目标对象类型.

session.subscribe("/topic/foo", new StompFrameHandler() {

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return String.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        // ...
    }

});

要启用STOMP心跳,需要用TaskScheduler配置webSocketStompClient,可以自定义心跳频率,10秒钟写入不活跃的,它将引发一次心跳发送;10秒读取补货要的,它会关闭连接.

如果在同一机器上进行数千个客户端模拟,则需要考虑关闭心跳.因为每个连接都有自己的心跳任务,这里没有为一个机器上的大量客户端运行做优化.

STOMP协议还支持收据,即客户端必须添加收据头以对应服务器处理发送或订阅之后返回的RECEIPT框架.为了支持它,StompSession提供了setAutoReceipt(boolean)属性,它可以在随后的每次发送和订阅都添加"receipt"头.你还可以手动选择添加一个"receipt"头到StompHeaders里.发送和订阅可以返回一个Receiptable实例,这样可以被用来注册收据成功或失败回调.对于该功能,客户端必须配置TaskScheduler,其时间量必须要小于订阅过期时间(默认15秒过期).

记住StompSessionHandler本身也是一个StompFrameHandler.它可以处理ERROR框架相应回调处理异常,这些异常来自处理消息,传输级别的HandleTransportError,包括ConnectionLostException.

26.4.15 WebSocket Scope(作用域)

每一个WebSocket的session都是属性的集合.这个map与客户端传入消息相连,还可以被控制器里的方法访问.例如:

@Controller
public class MyController {

    @MessageMapping("/action")
    public void handle(SimpMessageHeaderAccessor headerAccessor) {
        Map<String, Object> attrs = headerAccessor.getSessionAttributes();
        // ...
    }
}

你也可以申明一个spring管理的bean是websocket作用域.Websocket-scope的bean可以被注入到控制器和任何被注册为"clientInboundChannel"的频道拦截器里.这里bean一般是单例的,存活时间大于单个的webSocket session.所以你需要对WebSocket-scope的bean使用作用域代理模式.

@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {

    @PostConstruct
    public void init() {
        // Invoked after dependencies injected
    }

    // ...

    @PreDestroy
    public void destroy() {
        // Invoked when the WebSocket session ends
    }
}

@Controller
public class MyController {

    private final MyBean myBean;

    @Autowired
    public MyController(MyBean myBean) {
        this.myBean = myBean;
    }

    @MessageMapping("/action")
    public void handle() {
        // this.myBean from the current WebSocket session
    }
}

同任何自定义作用域一样,spring初始化一个新的Mybean实例,它可以被控制器获取,并存储到webScoket seesion属性里.每次相同的实例都会返回直到session终结.WebSocket作用域的bean会拥有所有的spring生命周期方法调用,如上例示.

26.4.16 配置和性能

谈到性能时,这里没有银弹.许多因素会影响它,包括消息大小,体积,应用方法工作是否需要阻塞,以及外部因素,包括网络速度和其他.本节主要提供了一些可配置选项的简介,还有关于如何合理缩放的思考.

在一个消息应用里,消息在线程池支持下被频道传输并可以被异步操作.配置这样一个应用需要消息频道和流相关的只是.具体可以参考26.4.3节.

显然,首先配置线程池支持的"clientInboundChannel"和"clientOutboundChannel".默认配置的数目是可用处理器的两倍.

如果注解方法处理消息受限于CPU,那么"clientInboundChannel"里的线程的数量应该和处理器数量相同.如果他们的工作更多是首先IO,需要阻塞,等待数据库或其他外部系统,那么线程池容量就可以增加.

ThreadPoolExecutor有三个重要属性.这里是核心线程池容量,线程池最大容量,等待线程最大容量.

最打困扰就是配置核心线程池容量(10)或最大线程池容量(20)这样就配置出一个10到20个线程的线程池.即使你把等待capacity加到最大,即Integer.MAX_VALUE,核心线程池里的线程数量也不会增加,因为只是增加了排队任务的数量.

请查看THreadPoolExecutor的文档来学习这些属性,并理解不同的排队策略.

在"clientOutboundChannel",是关于发送消息到WebSocket客户端.如果客户端的网络比较快,那么线程数量应该与可用处理器数量保持一致.但如果他们太慢或低带宽,接受消息会耗费太长时间,并给线程池添加负担.所以增加线程池容量是有必要的.

"clientInBoundChannel"的工作量很好预测-这个依赖于应用本身,但如何配置"clientOutboundChannel"却很难,因为有太多超出应用本身的因素.因此这里有两个与信息发送相关属性.这是"sendTimeLimit","sendBufferSizeLimit".分别用来配置发送消息到客户端时,发送可以花费多长时间或多大的数据会被缓存.

基本观点,任何单个线程用来发送消息给客户端的时间都是有限制的.同时所有其他消息会被缓冲,你需要使用属性来配置需要消耗多久时间发送一条消息,同时可以缓冲多大的数据.可以通过javadoc或xml文档来配置这些重要的额外细节.

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
    }

    // ...

}
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport send-timeout="15000" send-buffer-size="524288" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

上面的webSocket传输配置还可以配置接受的STOMP信息的最大数量.尽管理论上webSocket 消息在容量上是无限的,实际上有webSocket服务器限制,例如:tomcat是8K,jetty是64K.基于这个原因,stomp.js把大的STOMP消息分割为16K一份,然后把它们作为混合webSocket消息发送,这要求服务器缓冲并重新组装.

基于webSocket的spring STOMP支持这样,所以应用可以配置消息的最大容量,而无关webSocket server指定的消息容量.记住webSocket消息的容量可以自动调整,只要能保证他们最多传输16K大小的webSocket消息.

例子如下:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(128 * 1024);
    }

    // ...

}
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport message-size="131072" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

关于收放的最重要一点是使用混成应用实例.目前这个不适于简单代理器.但是适用如RabbitMQ等成熟的代理器.即每个应用实例可以连接到代理器,一个应用的消息广播可以通过代理器广播到其他应用实例相连的WebSocket客户端上.

26.4.17 运行监控

当使用@EnableWebSocketMessageBroker或websocket:message-broker时,关键框架组件会自动收集统计和数据,这可以对整个应用的内在状态提供洞察.这个配置也声明一个WebSocketMessageBrokerStats的bean在某处收集各种可用的信息,默认每30分钟记录为INFO级别.这个bean在运行时可以被通过spring的MBeanExporterd导出到JMX,例如通过JDK的Jsonsole.下面是可用信息的总结.

Client WebSocket Session(客户端 webSocket Session)

  • Current

    表明当前有多少客户端session通过代理器的方式 WebSocket VS Http流和SockJS session长轮询.

  • Total

    目前总计建立了多少session

  • Abnormal Closed(不正常关闭)

  • connect Failure (连接失败) 一个session已经建立,但在60秒内没有收到消息会被关闭.通常是网络或网络代理问题.

  • Send Limit Exceeded (发送过限)

     	发送消息超过配置的时间或缓存数量导致session关闭,一般可能是客户端网速过慢/
    
  • Transport Errors (传输错误)

      	当发生传输错误时,seesion关闭.比如读写WebSocket连接或Http请求/相应失败.
    
  • STOMP Frames

    CONNECT,CONNECTED,DISCONNECT框架周期的数量表明有多少客户端通过STOMP层次连接.记住连接关闭的数量要小于session非正常关闭的数量,或未通过发送DISCONNECT框架关闭的数量.

STOMP Broker Relay (stomp 代理器中介)

  • Tcp Conenctions (TCP连接)

说明有多少代表webSocket客户端的session的指向代理器的TCp连接.这个应该是客户端session的数量+1,另一个是应用内部用于发送消息的系统连接.

  • STOMP Frames (STOMP框架)

    从代理器接受或发送的代表客户端的CONENCT,CONNECTED,DISCONNECT框架的总数.记住一个DISCONECT是发送到代理器的,而与客户端WebSocket sessoin是否关闭无关.所以更少DISCONNECT框架的数量表明代理器挣积极关闭连接,可能是因为心跳为及时到达,一个无效的输入框架,或其他.

  • Client Inbound Channel(客户端输入频道)

来自支持"clientInboundChannel"端的线程池统计提供了入库消息进程的健康状态的审视.任务队列上升表明应用处理消息太慢了.如果这里有I/O方面的任务(例如降低数据库查询,第三方的REST API的http请求),则需要考虑增加线程池容量.

  • Client Outbound Channel (客户输出方面频道)

    从支持"clientOutboundChannel"的线程池得到的统计提供了想客户端广播消息相关的健康状况.排队数量变大,则说明客户端接受消息太慢.一种解决办法是增加线程池的数量来适应当前缓慢客户端的数量.另一种方法是降低发送时间间隔和发送缓冲限制.(查看上节内容)

  • SockJS Task Scheduler(SockJS 任务调度器)

    这个是用来发送心跳的SockJS任务调度器的线程池数量的统计.记住当心跳协商为STOMP层次时,SockJS心跳会失效.

26.4.18 测试标志的控制器方法

这里有两种主要步骤通过spring STOMP对WebSocket的支持来测试应用.一个是服务器测试控制器和它注解的消息处理方法的功能.另一个是写一个全功能的端对端的测试,包括运行着的客户端和服务器.

这两个步骤不是互相排斥的.相反,每一个都有整体测试策略的地点.服务端的测试更加集中在写和处理.端对端集成测试一方面更加完整,测试更多,但他们更多的是啊是啊写和处理.

服务器端测试最简单的方案是写一个控制器单元测试.但是因为控制器依赖它的注解,所以这个不是特别好用.单纯的单元测试无法检测他们.

理想的控制器测试应该在运行时唤醒,就像使用spring mvc测试框架来测试控制器处理http请求一样,无需运行在Servlet容器里但可以通过spring框架来唤醒这个标志控制器.项spring mvc测试一样,这里有两种可能的选择.要么使用"context-based"或"standalone"安装:

    1. 通过spring TestContext framework的帮助来加载真实的spring配置.将"clientInboundChannel"作为一个测试域,使用它发送消息会被控制器方法处理.
  • 2.手动安装能唤醒控制器(名为SimpAnnotationMethodMessageHandler)的最小的spring框架组件,并通过控制器直接发送消息.

这两种计划方案都表现在tests for the stock portfolio项目里.

第二种方案是创建端对端的集成测试.对此,你需要在可行状态下运行一个webSocket服务器,并通过WebSocket客户端连接它,并发送包含了STOMP框架的消息.tests for the stock portfolio项目里也介绍了一种方案,使用Tomcat作为可用服务器和一个简单STOMP客户端以达到测试目的.

展开阅读全文
0
3 收藏
分享
加载中
写的不错哦,支持一下
2019/08/19 15:19
回复
举报
更多评论
打赏
1 评论
3 收藏
0
分享
返回顶部
顶部