Spring WebSocket笔记

原创
2024/12/11 11:19
阅读数 57

WebSocketHandler 处理器注册

package com.dhcc.framework.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler").withSockJS();
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }

}

MyHandler

package com.dhcc.framework.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

@Slf4j
public class MyHandler implements org.springframework.web.socket.WebSocketHandler{
    /**
     * Invoked after WebSocket negotiation has succeeded and the WebSocket connection is
     * opened and ready for use.
     *
     * @param session
     * @throws Exception this method can handle or propagate exceptions; see class-level
     *                   Javadoc for details.
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        log.info("建立连接触发的方法");
        log.info("afterConnectionEstablished: {}",session.getId());
        session.sendMessage(new TextMessage("你好,我是客户端"));

    }

    /**
     * Invoked when a new WebSocket message arrives.
     *
     * @param session
     * @param message
     * @throws Exception this method can handle or propagate exceptions; see class-level
     *                   Javadoc for details.
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        String msg = (String) message.getPayload();
        log.info("客户端发来消息:{}",msg);
    }

    /**
     * Handle an error from the underlying WebSocket message transport.
     *
     * @param session
     * @param exception
     * @throws Exception this method can handle or propagate exceptions; see class-level
     *                   Javadoc for details.
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        log.error("传输出现错误",exception);
    }

    /**
     * Invoked after the WebSocket connection has been closed by either side, or after a
     * transport error has occurred. Although the session may technically still be open,
     * depending on the underlying implementation, sending messages at this point is
     * discouraged and most likely will not succeed.
     *
     * @param session
     * @param closeStatus
     * @throws Exception this method can handle or propagate exceptions; see class-level
     *                   Javadoc for details.
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        log.warn("连接被关闭");
    }

    /**
     * Whether the WebSocketHandler handles partial messages. If this flag is set to
     * {@code true} and the underlying WebSocket server supports partial messages,
     * then a large WebSocket message, or one of an unknown size may be split and
     * maybe received over multiple calls to
     * {@link #handleMessage(WebSocketSession, WebSocketMessage)}. The flag
     * {@link WebSocketMessage#isLast()} indicates if
     * the message is partial and whether it is the last part.
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
}

WebSocketMessageBroker消息代理配置

package com.dhcc.framework.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;

@Configuration
@Slf4j
public class WebSocketMessageBrokerConfig extends WebSocketMessageBrokerConfigurationSupport {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        log.info("init configureWebSocketTransport");
        //The default value is 64K (i.e. 64 * 1024).
        registry.setMessageSizeLimit(20 * 1024 * 1024);
    }


    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        log.info("init registerStompEndpoints websocket 上下文");
        registry.addEndpoint("/sockjs")// WebSocket 连接路径
                .withSockJS() //SockJs是一个WebSocket的通信js库
                .setStreamBytesLimit(512 * 1024)
                .setHttpMessageCacheSize(1000)
                .setDisconnectDelay(30 * 1000);

        /*registry.addEndpoint("/handshake")
                .withSockJS() //SockJs是一个WebSocket的通信js库
                .setStreamBytesLimit(512 * 1024)
                .setHttpMessageCacheSize(1000)
                .setDisconnectDelay(30 * 1000);*/
    }


    /**
     * 设置消息代理,订阅的地址 /app/topic ; /app/user
     * @param config
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        log.info("init configureMessageBroker");
        //user 用于用户聊天
        config.enableSimpleBroker("/topic","/user"); // 主题路径前缀

        //会订阅Rabbitmq默认的交换器amq.topic的绑定关系中定义的队列
        //config.enableStompBrokerRelay("/topic");
        config.setApplicationDestinationPrefixes("/send");     // 应用路径前缀
    }
}

客户端

package com.dhcc.project.evaluate.websocket.client;


import com.dhcc.framework.config.MyHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.sockjs.client.RestTemplateXhrTransport;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;

@Component
@AllArgsConstructor
@Slf4j
public class EvaluateStompClient {
    //private final WebSocketHandler myHandler;
    public void startClient(final String message) {
        WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
        headers.add("username", "guest");
        headers.add("password", "guest");
        // 可以在这里添加认证头部信息
        List<Transport> transports = new ArrayList<>(2);
        transports.add(new WebSocketTransport(new StandardWebSocketClient()));
        transports.add(new RestTemplateXhrTransport());
        SockJsClient sockJsClient = new SockJsClient(transports);

        //com.dhcc.framework.config.WebSocketMessageBrokerConfig.registerStompEndpoints
        ListenableFuture<WebSocketSession> future = sockJsClient.doHandshake(new MyHandler(), headers, URI.create("ws://localhost:5011/sockjs"));
        sockJsClient.start();
        future.addCallback(new ListenableFutureCallback<WebSocketSession>() {
            @Override
            public void onFailure(Throwable ex) {

            }

            @Override
            public void onSuccess(WebSocketSession result) {
                log.info("发送成功");
                //result.sendMessage(message);
            }
        });
    }
}

 

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部