WebSocketHandler 处理器注册
package com.dhcc.framework.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler").withSockJS();
}
@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}
}
MyHandler
package com.dhcc.framework.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
@Slf4j
public class MyHandler implements org.springframework.web.socket.WebSocketHandler{
/**
* Invoked after WebSocket negotiation has succeeded and the WebSocket connection is
* opened and ready for use.
*
* @param session
* @throws Exception this method can handle or propagate exceptions; see class-level
* Javadoc for details.
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("建立连接触发的方法");
log.info("afterConnectionEstablished: {}",session.getId());
session.sendMessage(new TextMessage("你好,我是客户端"));
}
/**
* Invoked when a new WebSocket message arrives.
*
* @param session
* @param message
* @throws Exception this method can handle or propagate exceptions; see class-level
* Javadoc for details.
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
String msg = (String) message.getPayload();
log.info("客户端发来消息:{}",msg);
}
/**
* Handle an error from the underlying WebSocket message transport.
*
* @param session
* @param exception
* @throws Exception this method can handle or propagate exceptions; see class-level
* Javadoc for details.
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("传输出现错误",exception);
}
/**
* Invoked after the WebSocket connection has been closed by either side, or after a
* transport error has occurred. Although the session may technically still be open,
* depending on the underlying implementation, sending messages at this point is
* discouraged and most likely will not succeed.
*
* @param session
* @param closeStatus
* @throws Exception this method can handle or propagate exceptions; see class-level
* Javadoc for details.
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
log.warn("连接被关闭");
}
/**
* Whether the WebSocketHandler handles partial messages. If this flag is set to
* {@code true} and the underlying WebSocket server supports partial messages,
* then a large WebSocket message, or one of an unknown size may be split and
* maybe received over multiple calls to
* {@link #handleMessage(WebSocketSession, WebSocketMessage)}. The flag
* {@link WebSocketMessage#isLast()} indicates if
* the message is partial and whether it is the last part.
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
}
WebSocketMessageBroker消息代理配置
package com.dhcc.framework.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;
@Configuration
@Slf4j
public class WebSocketMessageBrokerConfig extends WebSocketMessageBrokerConfigurationSupport {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
log.info("init configureWebSocketTransport");
//The default value is 64K (i.e. 64 * 1024).
registry.setMessageSizeLimit(20 * 1024 * 1024);
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
log.info("init registerStompEndpoints websocket 上下文");
registry.addEndpoint("/sockjs")// WebSocket 连接路径
.withSockJS() //SockJs是一个WebSocket的通信js库
.setStreamBytesLimit(512 * 1024)
.setHttpMessageCacheSize(1000)
.setDisconnectDelay(30 * 1000);
/*registry.addEndpoint("/handshake")
.withSockJS() //SockJs是一个WebSocket的通信js库
.setStreamBytesLimit(512 * 1024)
.setHttpMessageCacheSize(1000)
.setDisconnectDelay(30 * 1000);*/
}
/**
* 设置消息代理,订阅的地址 /app/topic ; /app/user
* @param config
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
log.info("init configureMessageBroker");
//user 用于用户聊天
config.enableSimpleBroker("/topic","/user"); // 主题路径前缀
//会订阅Rabbitmq默认的交换器amq.topic的绑定关系中定义的队列
//config.enableStompBrokerRelay("/topic");
config.setApplicationDestinationPrefixes("/send"); // 应用路径前缀
}
}
客户端
package com.dhcc.project.evaluate.websocket.client;
import com.dhcc.framework.config.MyHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.sockjs.client.RestTemplateXhrTransport;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@Component
@AllArgsConstructor
@Slf4j
public class EvaluateStompClient {
//private final WebSocketHandler myHandler;
public void startClient(final String message) {
WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
headers.add("username", "guest");
headers.add("password", "guest");
// 可以在这里添加认证头部信息
List<Transport> transports = new ArrayList<>(2);
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());
SockJsClient sockJsClient = new SockJsClient(transports);
//com.dhcc.framework.config.WebSocketMessageBrokerConfig.registerStompEndpoints
ListenableFuture<WebSocketSession> future = sockJsClient.doHandshake(new MyHandler(), headers, URI.create("ws://localhost:5011/sockjs"));
sockJsClient.start();
future.addCallback(new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onFailure(Throwable ex) {
}
@Override
public void onSuccess(WebSocketSession result) {
log.info("发送成功");
//result.sendMessage(message);
}
});
}
}