WebSocket

Published: 25 May 2020 Category: web

一、WebSocket

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。

现在,很多网站为了实现推送技术,所用的技术都是 Ajax 轮询。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP请求,然后由服务器返回最新的数据给客户端的浏览器。这种传统的模式带来很明显的缺点,即浏览器需要不断的向服务器发出请求,然而HTTP请求可能包含较长的头部,其中真正有效的数据可能只是很小的一部分,显然这样会浪费很多的带宽等资源。

HTML5 定义的 WebSocket 协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

可以应用在 实时要求高、海量并发的应用(网站)上。比如金融证券得实时信息,web导航应用中的地理未知获取,社交网络的实时推送等。

一个使用WebSocket应用于视频的业务思路如下:

  • 使用心跳维护websocket链路,探测客户端端的网红/主播是否在线
  • 设置负载均衡7层的proxy_read_timeout默认为60s
  • 设置心跳为50s,即可长期保持Websocket不断开

1.1 原理和运行机制

From https://blog.csdn.net/qq_35623773/article/details/87868682

在HTTP中一个request只能有一个response。而且这个response也是被动的,不能主动发起。

首先,相对于HTTP这种非持久的协议来说,Websocket是一个持久化的协议。但是,Websocket是基于HTTP协议的,或者说借用了HTTP的协议来完成一部分握手。

客户端建立一个websocket连接,会发送如下信息:

GET /webfin/websocket/ HTTP/1.1

Host: localhost

Upgrade: websocket

Connection: Upgrade

Sec-WebSocket-Key: xqBt3ImNzJbYqRINxEFlkg==

Origin: http://localhost:8080

Sec-WebSocket-Version: 13

在HTTP协议中,多了这些东西:

Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: xqBt3ImNzJbYqRINxEFlkg==

其中, Upgrade:websocket参数值表明这是WebSocket类型请求,Sec-WebSocket-Key是WebSocket客户端发送的一个 base64编码的密文,要求服务端必须返回一个对应加密的Sec-WebSocket-Accept应答,否则客户端会抛出Error during WebSocket handshake错误,并关闭连接。

响应:

HTTP/1.1 101 Switching Protocols

Upgrade: websocket

Connection: Upgrade

Sec-WebSocket-Accept: K7DJLdLooIwIG/MOpvWFB3y3FE8=

Sec-WebSocket-Accept的值是服务端采用与客户端一致的密钥计算出来后返回客户端的,具体是客户端传输过来的Sec-WebSocket-Key跟“258EAFA5-E914-47DA-95CA-C5AB0DC85B11”拼接后,用SHA-1加密,并进行BASE-64编码得来的。

HTTP/1.1 101 Switching Protocols表示服务端接受WebSocket协议的客户端连接,客户端收到Sec-WebSocket-Accept后,将本地的Sec-WebSocket-Key进行同样的编码,然后比对。经过这样的请求-响应处理后,两端的WebSocket连接握手成功, 后续就可以进行TCP通讯了。

只需要经过一次HTTP请求,就可以做到源源不断的信息传送了。(在程序设计中,这种设计叫做回调,即:你有信息了再来通知我,而不是我傻乎乎的每次跑来问你)

1.2 协议栈

https://tools.ietf.org/html/rfc6455

二、使用姿势(后端)

以下是 WebSocket 对象的相关事件。

事件 事件处理程序 描述
open Socket.onopen 连接建立时触发
message Socket.onmessage 客户端接收服务端数据时触发
error Socket.onerror 通信发生错误时触发
close Socket.onclose 连接关闭时触发

下面是后端实现的代码实例,用 @ServerEndpoint来指明访问地址,可以加上一些handler来处理消息。

configurator可以去除掉。本文会用它来获取连接的IP(见下文)。

@OnOpen是建立socket触发的事件;其他 @On*都对应上述表格中的事件。只有4种。

@Component
@ServerEndpoint(value = "/ws/v1/exam", configurator = EndpointConfigurator.class)
public class WebSocketHandler {

    private static MessageHandlerService messageHandlerService;

    private static SocketControlService socketControlService;

    @Autowired
    public void setWebSocketHandler(MessageHandlerService messageHandlerService) {
        WebSocketHandler.messageHandlerService = messageHandlerService;
    }

    @Autowired
    public void setSocketControlService(SocketControlService socketControlService) {
        WebSocketHandler.socketControlService = socketControlService;
    }

    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        log.info("[{}] open ws", session.getId());
        socketControlService.handleConnect(session);
    }

    @OnClose
    public void onClose(Session session, CloseReason reason) {
        log.info("[{}] close ws: {}", session.getId(), reason.toString());
        messageHandlerService.handleClose(session, reason.toString());
        // socketControlService.handleDisConnect(session);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        // NOTE: Don't print any error exception.
        // As known, nginx will close the web socket connection after long time inactive, with causing EOFException
        // null. Just ignore those exceptions.
        log.warn("[{}] error in ws connection", session.getId());
        messageHandlerService.handleClose(session, error.getMessage());
        // socketControlService.handleDisConnect(session);
    }

    /**
     * 接收string
     */
    @OnMessage
    public void onMessage(Session session, String messages) {
        log.debug("[{}] receive String: {}", session.getId(), cutStr(messages));
        beginTime.set(System.currentTimeMillis());
        log.info("[begin] {} ", session.getId());

        messageHandlerService.handle(session, messages);

        long timeUsed = System.currentTimeMillis() - beginTime.get();
        log.info("[sessionId:{}, time:{}ms]", session.getId(), timeUsed);

        // 发送消息
        session.getBasicRemote().sendText(String.valueOf(timeUsed));
    }

    /**
     * 接收byte[]
     */
    @OnMessage
    public void onMessage(Session session, byte[] messages) {
        // empty
    }
}

WebSocket接收的消息有两种,一种是Text,一种是Binary,分别对应上述的两种messages参数类型。可以只用一种。

发送消息使用 session.getBasicRemote().sendText()session.getBasicRemote().sendBinary(),也是可以发送Text或二进制内容。

连接websocket使用类似于 ws://172.18.17.151:8085/ws/v1/exam 地址。

在Chrome里可以使用 Simple WebSocket Client插件来调试。

三、踩过的坑和Best practice

建议先看一遍tomcat对websocket的使用介绍,就短短一页内容:http://tomcat.apache.org/tomcat-8.0-doc/web-socket-howto.html

3.1 Socket连接调优

在socket连接过程中,留给程序员可配置的内存大小,主要就两个参数:maxTextMessageBufferSize、maxBinaryMessageBufferSize, 分别对应字符类消息和字节类消息的BufferSize,单位是byte,默认都是8192bytes。

配置参数

@Setter
@Configuration
@ConfigurationProperties(prefix = "webSocket")
public class WebSocketConfig {

    private int maxTextMessageBufferSize = 10 * 1024 * 1024;
    private int maxBinaryMessageBufferSize = 10 * 1024 * 1024;

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(maxTextMessageBufferSize);
        container.setMaxBinaryMessageBufferSize(maxBinaryMessageBufferSize);
        return container;
    }
}

在scoket建立时,会预分配maxTextMessageBufferSize+maxBinaryMessageBufferSize的空间。所以,可以调小这两个数值来增加服务的websocket连接数。如果只用到其中一种消息类型(如Text),可以将另一种设置成0或很小的值。

但是,如果消息大小超过了这个bufferSize,会导致ws断开。所以得根据实际业务情况来估值。

不过,对于那种消息体很大或流式消息的情况,可以使用MessageHandler.Partial 机制,来实现将一个消息拆分成多个call传输。鉴于本次服务没有这种场景,未能继续深究。

除了WebSocket本身的调优,还可以调节JVM的 -Xmx 参数,加大服务的最大运存;还可以及时断开Socket释放资源。

3.2 从Socket中获取IP

根据服务的部署情况,可以通过下面方式来获取Socket中的IP。

基于IP可以添加诸如限制黑白名单、流量限制、地域统计等功能。

3.2.1 直连服务

在客户和服务之间没有中转服务(如正向代理、loadBalance等)时,可以采用很简单的方式获取到IP。

/**
 * 针对webSocket相关的封装
 */
@Slf4j
public class WebSocketUtil {

    /**
     * 向客户端发送消息
     */
    public static void sendMsgToClient(Session session, String message) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] send message: {}", session.getId(), message);
        }
        try {
            session.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("[{}] fail to send message", session.getId(), e);
        }
    }

    public static void sendMsgToClient(Session session, Object object) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] send message: {}", session.getId(), object);
        }
        try {
            session.getBasicRemote().sendText(JsonUtil.toJsonString(object));
        } catch (Exception e) {
            log.error("[{}] fail to send message", session.getId(), e);
        }
    }

    public static InetSocketAddress getRemoteAddress(Session session) {
        if (session == null) {
            return null;
        }
        Async async = session.getAsyncRemote();

        // 在Tomcat 8.0.x版本有效
        // InetSocketAddress socketAddress =
        // (InetSocketAddress) getFieldInstance(async,"base#sos#socketWrapper#socket#sc#remoteAddress");
        // 在Tomcat 8.5以上版本有效
        return (InetSocketAddress) getFieldInstance(async, "base#socketWrapper#socket#sc#remoteAddress");
    }

    private static Object getFieldInstance(Object obj, String fieldPath) {
        String[] fields = fieldPath.split("#");
        for (String field : fields) {
            obj = getField(obj, obj.getClass(), field);
            if (obj == null) {
                return null;
            }
        }
        return obj;
    }

    private static Object getField(Object obj, Class<?> clazz, String fieldName) {
        for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
            try {
                Field field;
                field = clazz.getDeclaredField(fieldName);
                field.setAccessible(true);
                return field.get(obj);
            } catch (Exception e) {
                // just ignore
                // log.debug("Failed to get field");
            }
        }

        return null;
    }

}

业务端调用代码:

    public void handleConnect(Session session) {
        try {
            String address = getAddress(session);
            log.debug("socket ip connected:{}", StringUtil.isBlank(address) ? UNKNOWN : address);
            session.getUserProperties().put(Constants.SESSION_IP, address);
        } catch (Exception ex) {
            // won't throw any exception.
            log.warn("handle connect error, will go on", ex.getMessage());
        }
    }

    private String getAddress(Session session) {
        InetSocketAddress remoteAddress = WebSocketUtil.getRemoteAddress(session);
        if (remoteAddress != null) {
            return remoteAddress.getAddress().getHostAddress();
        } else {
            return null;
        }
    }

原理就是基于Java反射获取Session中的InetSocketAddress,然后读取到客户端的IP地址。再将其存储在session.getUserProperties()里,供后续业务使用。

3.2.2 中转连接服务

在微服务模式和分布式的时代下,客户直连服务的场景基本不复存在。那么如何在中转服务后获取到客户真实的IP呢?

首先,得保证中转服务(如loadBalaner)能透传客户的IP,例如加到请求Header的某个字段里。不然后端也是巧妇难为无米之炊。

接着,得想办法在Session里获取到这个字段。WebSocket是基于TCP协议的,但是没看到和HTTP一样的header,在OnStart事件时拿到的Session,里面包含的信息也很有限,很有可能hSession就不包含header!

那得在前一步拿,就是在TCP的HandShake的时候,将拿到的IP值,作为用户属性传给Session。

第一步,在握手的时候捕获来自中转服务的字段:

import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;

@Slf4j
public class EndpointConfigurator extends ServerEndpointConfig.Configurator {

    private static final String IP_HEADER = "X-Original-Forwarded-For";

    @Override
    public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest handshakeRequest,
            HandshakeResponse response) {
        for (Map.Entry<String, List<String>> e : handshakeRequest.getHeaders().entrySet()) {
            // log.debug(JsonUtil.toJsonString(e));
            if (IP_HEADER.equalsIgnoreCase(e.getKey())) {
                config.getUserProperties().put("client-ip", e.getValue().get(0));
            }
        }
    }
}

第二步,在WebSocketHandler加上configurator配置,就用上面的EndpointConfigurator:

@Slf4j
@Component
@ServerEndpoint(value = "/ws/v1/exam", configurator = EndpointConfigurator.class)
public class WebSocketHandler {
    ...
}

第三步,读取信息

    @OnMessage
    public void onMessage(Session session, String messages) {
        String ip = session.getUserProperties().get("client-ip");
        ...
    }

以上实现了从WebSocket获取IP的过程。

REF

WebSocket How-To