在spring boot项目中使用websocket,网上查阅了一些资料,记录一下以备查阅。
使用websocket有两种方式,一种是sockJs,一种是H5方式,个人感觉H5方式更加简单。无论使用哪种方式,都需要引入的POM依赖如下1
2
3
4
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-websocket</artifactId >
</dependency >
H5方式一 配置ServerEndpointExporter 1
2
3
4
5
6
7
8
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter () {
return new ServerEndpointExporter();
}
}
接下来写websocket的具体实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
@ServerEndpoint ("/websocketSsh" )
public class WebSocketSsh {
private static CopyOnWriteArraySet<WebSocketSsh> webSocketSet = new CopyOnWriteArraySet<>();
private Session session;
@OnOpen
public void onOpen (Session session) {
this .session = session;
webSocketSet.add(this );
DICLogger.info("有新的websocket链接加入:{}" ,session.getId());
}
@OnClose
public void onClose () {
webSocketSet.remove(this );
DICLogger.info("链接关闭:{}" ,session.getId());
}
@OnMessage
public void onMessage (String message,Session session) throws Exception {
DICLogger.debug("{}新消息:{}" ,session.getId(),message);
sendMessage("服务器收到消息:" +message);
}
public void sendMessage (String message) throws Exception {
this .session.getBasicRemote().sendText(message);
}
}
首先要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。
上面使用了@ServerEndpoint注解创立websocket endpoint,使用spring boot时需要使用@Component注解声明下,让其纳入spring管理,虽然@Component是单例的,但是spring boot还是会为每一个socket连接初始化一个bean,所以可以使用一个静态的Set保存起来,私有成员变量session也是安全的。
核心是@ServerEndpoint这个注解。这个注解是Javaee标准里的注解,tomcat7以上已经对其进行了实现,如果是用传统方法使用tomcat发布项目,需要要在pom文件中引入javaee标准。1
2
3
4
5
6
<dependency >
<groupId > javax</groupId >
<artifactId > javaee-api</artifactId >
<version > 7.0</version >
<scope > provided</scope >
</dependency >
前端的代码很简单,简单实现如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<body >
<div >
<input id ="text" type ="text" />
<button onclick ="send()" > 发送</button >
<button onclick ="close()" > 关闭2</button >
</div >
<div id ="message" > </div >
<script >
var $msg = $("#message" );
var websocket = new WebSocket("ws://localhost:9092/websocketSsh" );
websocket.onerror = function ( ) {
showMessage("error" );
}
websocket.onopen = function (event ) {
showMessage("open" );
}
websocket.onmessage = function (event ) {
console .log(event);
showMessage(event.data);
}
websocket.onclose = function ( ) {
showMessage("close" );
}
window .onbeforeunload = function ( ) {
websocket.close();
}
function send ( ) {
var text = $("#text" ).val();
websocket.send(text);
}
function close ( ) {
websocket.close();
}
function showMessage (message ) {
$msg.append("<div>" +message+"</div>" )
}
</script >
</body >
H5方式二 第一种方式有一个缺点,在WebSocketSsh类里面无法使用@Value或@Autowired之类的Spring注入,所以更简单的结合spring的使用方式如下:
配置websocket 1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers (WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketSshHandler(),"/websocketSsh" ).setAllowedOrigins("*" );
}
@Bean
public WebSocketSshHandler webSocketSshHandler () {
return new WebSocketSshHandler();
}
}
注意:需要加上@EnableWebSocket注解
registry.addHandler的第一个参数是WebSocketSshHandler单例bean,这里不能直接传入new WebSocketSshHandler()
,只能直接调用webSocketSshHandler()
方法,开始我没搞懂这两者有什么区别,测试时发现直接new WebSocketSshHandler()
是无法纳入spring管理的。个人猜测在spring创建bean阶段,如果调用了另一个被@Bean
注解了的方法,spring会优先创建这个bean再返回其果。
简单测试下1
2
3
4
5
6
7
8
9
10
11
@Override
public void registerWebSocketHandlers (WebSocketHandlerRegistry registry) {
DICLogger.error("registerWebSocketHandlers" );
registry.addHandler(webSocketSshHandler(),"/ssh" ).setAllowedOrigins("*" );
DICLogger.error("registerWebSocketHandlers end" );
}
@Bean (initMethod = "init" )
public WebSocketSshHandler webSocketSshHandler () {
DICLogger.error("webSocketSshHandler" );
return new WebSocketSshHandler();
}
WebSocketSshHandler类有个init方法,里面执行一句打印DICLogger.error("WebSocketSshHandler init");
。启动boot程序,打印顺序如下1
2
3
4
2018-01-12 14:58:28.802 ERROR 10132 registerWebSocketHandlers
2018-01-12 14:58:28.809 ERROR 10132 webSocketSshHandler
2018-01-12 14:58:28.842 ERROR 10132 WebSocketSshHandler init
2018-01-12 14:58:28.865 ERROR 10132 registerWebSocketHandlers end
测试结束,spring牛逼。
WebSocketSshHandler类实现如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class WebSocketSshHandler extends TextWebSocketHandler {
@Autowired
SshService sshService;
@Override
public void afterConnectionEstablished (WebSocketSession session) throws Exception {
DICLogger.info("WebSocketSshHandler:有新的websocket链接加入:{}" ,session.getId());
}
@Override
protected void handleTextMessage (WebSocketSession session, TextMessage message) throws Exception {
DICLogger.debug("{}新消息:{}" ,session.getId(),message.getPayload());
sshService.sendMessage(session,"WebSocketSshHandler:" +message.getPayload());
}
@Override
public void afterConnectionClosed (WebSocketSession session, CloseStatus status) throws Exception {
DICLogger.info("WebSocketSshHandler:链接关闭:{}" ,session.getId());
}
public void init () {
DICLogger.error("WebSocketSshHandler init" );
}
}
前端的使用方式一致。
sockJs方式 spring官方提供了websocket各浏览器兼容方案,基于SockJs协议封装对用户透明的模拟websocket的备选方案,在支持websocket的浏览器使用websocket,其他浏览器会尝试使用ajax streaming或者Iframe等方式达到相同效果
配置 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker (MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic" ,"/user" );
registry.setApplicationDestinationPrefixes("/app" );
registry.setUserDestinationPrefix("/user" );
}
@Override
public void registerStompEndpoints (StompEndpointRegistry registry) {
registry.addEndpoint("/endpoint" ).setAllowedOrigins("*" ).withSockJS();
}
@Override
public void configureClientInboundChannel (ChannelRegistration registration) {
registration.setInterceptors(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
//mark1:这块注释的部分是模拟设置用户信息,用户信息通过header传过来做校验,当然用户校验也可以放在tcp握手阶段
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
List<String> userNames = accessor.getNativeHeader("username");
if(!userNames.isEmpty()&& StringUtils.isNotBlank(userNames.get(0))){
String username = userNames.get(0);
accessor.setUser(new Principal() {
@Override
public String getName() {
return username;
}
});
}else{
DICLogger.info("username不能为空,连接失败");
//返回null,Controller将收不到消息
return null;
}
}*/
return message;
}
});
}
@Override
public void configureWebSocketTransport (WebSocketTransportRegistration registry) {
registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
@Override
public WebSocketHandler decorate (WebSocketHandler handler) {
return new WebSocketHandlerDecorator(handler) {
@Override
public void handleMessage (WebSocketSession session, WebSocketMessage<?> message) throws Exception {
super .handleMessage(session,message);
}
@Override
public void afterConnectionEstablished (WebSocketSession session) throws Exception {
DICLogger.info("afterConnectionEstablished:有新的连接加入,sessionId:{},username:{}" ,session.getId(),session.getPrincipal().getName());
super .afterConnectionEstablished(session);
}
@Override
public void afterConnectionClosed (WebSocketSession session, CloseStatus closeStatus) throws Exception {
DICLogger.info("afterConnectionEstablished:连接断开,sessionId:{},username:{}" ,session.getId(),session.getPrincipal().getName());
super .afterConnectionClosed(session,closeStatus);
}
};
}
});
super .configureWebSocketTransport(registry);
}
@Bean
public StompConnectEventListener stompConnectEventListener () {
return new StompConnectEventListener();
}
@Bean
public StompDisconnectEventListener stompDisconnectEventListener () {
return new StompDisconnectEventListener();
}
}
上面的代码给出了配置socket的方式
mark 1:这里可以在连接建立的时候设置用户信息的,用户信息与session一一对应,用户信息应保持唯一性。这个用户信息在服务端主动推送消息到某个具体客户端的时候非常重要。这里暂时注释掉,因为在这里即便是用户验证不通过或是用户信息重复也无法阻止连接的建立,最好是在握手阶段验证和设置用户信息。
mark 2:handleMessage
方法可以处理消息,目前测试时只有客户端发送到服务器的消息能在这被处理,在这方法里可以对参数做一些处理如校验等,也可以关闭连接,与拦截器不同的是这里的message是org.springframework.web.socket.WebSocketMessage
接口实例,而拦截器中的message是org.springframework.messaging.Message
接口实例,Message
接口提供了获取header的方法而WebSocketMessage
接口没有,所以在super.handleMessage
方法中就是将WebSocketMessage
解析处理成Message
。测试时尝试过自行转换和处理,这样就可以在这校验用户并决定是否主动关闭连接,可行,但是很多方法和属性是私有,担心稳定性所以作罢。
mark 3、mark 4:afterConnectionEstablished
在连接建立后调用,可以用来监听连接建立,同样afterConnectionClosed
方法可以监听连接断开。实际应用中可以全局建立WebSocketSession注册机制,static ConcurrentMap<String,WebSocketSession> sessionRegistry
可以在任意地方由服务端主动关闭连接。
spring 还实现了基于事件的方式来监听连接的建立和关闭,上面的两个@Bean就是创建两个监听器,代码如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class StompConnectEventListener implements ApplicationListener <SessionConnectEvent > {
@Override
public void onApplicationEvent (SessionConnectEvent sessionConnectEvent) {
StompHeaderAccessor sha = StompHeaderAccessor.getAccessor(sessionConnectEvent.getMessage(),StompHeaderAccessor.class);
DICLogger.info("StompConnectEventListener:有新的连接加入:sessionID:{},username:{}" ,sha.getSessionId(),sha.getUser().getName());
}
}
public class StompDisconnectEventListener implements ApplicationListener <SessionDisconnectEvent > {
@Override
public void onApplicationEvent (SessionDisconnectEvent sessionDisconnectEvent) {
StompHeaderAccessor sha = StompHeaderAccessor.getAccessor(sessionDisconnectEvent.getMessage(),StompHeaderAccessor.class);
DICLogger.info("StompDisconnectEventListener:连接断开:sessionID:{},username:{}" ,sha.getSessionId(),sha.getUser().getName());
}
}
tip:在这个监听器中不能获取到WebSocketSession
实例。
如果配置了监听,在连接建立时,先执行afterConnectionEstablished
在执行StompConnectEventListener onApplicationEvent
。1
2
2018-01-19 15:59:23.071 INFO WebSocketStompConfig$4$1 : afterConnectionEstablished:有新的连接加入,sessionId:1slq4is3,username:qq
2018-01-19 15:59:28.744 INFO StompConnectEventListener : StompConnectEventListener:有新的连接加入:sessionID:1slq4is3,username:qq
写Controller 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Controller
public class WebSocketController {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@MessageMapping ("/hello" )
@SendTo ("/topic/greeting" )
public String greeting (@Header("atytopic" ) String topic, @Headers Map<String, Object> headers,Map param) {
DICLogger.info("topic:" +topic);
DICLogger.info("greeting headers:" + JSONUtil.toJSONString(headers));
return "返回消息:" +JSONUtil.toJSONString(param);
}
@MessageMapping ("/message" )
public void handleMessage (StompHeaderAccessor headerAccessor, @Headers Map<String, Object> headers, Map param) {
DICLogger.info("message headers:" + JSONUtil.toJSONString(headers));
DICLogger.info("message:" + JSONUtil.toJSONString(param));
String username = headerAccessor.getUser().getName();
messagingTemplate.convertAndSendToUser(username,"/queue/message" ,"收到消息:" +headerAccessor.getSessionId());
}
}
Controller写法跟springMVC的方式几乎一致,不同的是方法地址映射改成@MessageMapping
。 上例中@MessageMapping("/hello")
中的\hello
就是客户端发送消息的地址,js代码如下1
stompClient.send("/app/hello" , {atytopic :"greetings" }, JSON .stringify({ 'message' : $('#message' ).val(),'username' :username }));
第一个参数就是地址,app是配置中configureMessageBroker
方法配置的前缀registry.setApplicationDestinationPrefixes("/app");
。
@SendTo("/topic/greeting")
是服务端广播发向客户端的地址,可以理解为一个主题,所有订阅了这个主题的客户端才会收到消息。客户端监听代码如下1
2
3
4
5
stompClient.subscribe('/topic/greeting' , function (frame ) {
console .log("receive" );
console .log(frame);
showMessage(frame.body);
});
很多博客都推荐使用这种注解的方式指定客户端订阅地址,感觉这又回到http的一应一答的方式了,个人不太喜欢,推荐java编码的方式使用SimpMessagingTemplate类发送消息。
websocket最常用的使用场景是服务器发送消息到某一个具体的客户端,上面代码中messagingTemplate.convertAndSendToUser(username,"/queue/message","收到消息:"+headerAccessor.getSessionId());
就是发送消息到具体的一个客户端。这方法的第一个参数是用户名称,第二个参数是客户端的监听地址,第三个参数是消息实体。这里有一个不算坑的坑,反正我在这卡了好几天,messagingTemplate.convertAndSendToUser
这个方法源码文档也没有对参数给出详细的说明,很多博客也没说明第一个参数username到底是个什么,官方文档我也没翻到详细解释,最开始我一直以为是sessionId,然后各种尝试,最后在一篇博客中翻到一个博主在方法中注入了StompHeaderAccessor
对象,就是上例中的public void handleMessage(StompHeaderAccessor headerAccessor, @Headers Map<String, Object> headers, Map param)
,StompHeaderAccessor
实例中有个很重要的方法getUser()
,同样也有个setUser(Principal principal)
,它们的实现是在SimpMessageHeaderAccessor
类中,源码如下:1
2
3
4
5
6
7
8
9
public void setUser (Principal principal) {
setHeader(USER_HEADER, principal);
}
* Return the user associated with the current session.
*/
public Principal getUser () {
return (Principal) getHeader(USER_HEADER);
}
最后尝试调用这两个方法设置自定义用户信息,测试通过。所以,messagingTemplate.convertAndSendToUser
方法的第一个参数就是用户信息,在上面配置中mark1的注释块中就是怎样设置用户信息(也可以在握手阶段就设置用户信息,后面给出例子)。
messagingTemplate.convertAndSendToUser
方法第二个参数是目标地址,即客户端监听地址,上例中的目标地址是/queue/message
,那么客户端就需要监听这个地址,不同于监听的广播地址,客户端需要加上/user
前缀,即配置中registry.setUserDestinationPrefix("/user");
设置的值,客户端代码如下:1
2
3
4
stompClient.subscribe('/user/' +username+'/queue/message' ,function (frame ) {
console .log(frame);
showMessage(frame.body);
});
上面那个username值就是messagingTemplate.convertAndSendToUser
第一个参数,这个值必须在服务端保持唯一。
前端代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
<!DOCTYPE html>
<html lang ="en" >
<head >
<title > Hello WebSocket</title >
<script src ="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.min.js" > </script >
<script src ="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.js" > </script >
<script src ="http://cdn.bootcss.com/jquery/3.1.1/jquery.min.js" > </script >
<script type ="text/javascript" >
var stompClient = null ;
function setConnected (connected ) {
document .getElementById('connect' ).disabled = connected;
document .getElementById('disconnect' ).disabled = !connected;
document .getElementById('conversationDiv' ).style.visibility = connected ? 'visible' : 'hidden' ;
document .getElementById('response' ).innerHTML = '' ;
}
var username = "" ;
function connect ( ) {
username = $("#username" ).val();
var socket = new SockJS("/endpoint" );
stompClient = Stomp.over(socket);
stompClient.connect({username :username}, function (frame ) {
setConnected(true );
console .log(frame);
console .log('Connected: ' + frame);
stompClient.subscribe('/topic/greeting' , function (frame ) {
console .log("receive" );
console .log(frame);
showMessage(frame.body);
});
stompClient.subscribe('/user/' +username+'/queue/message' ,function (frame ) {
console .log(frame);
showMessage(frame.body);
});
},function (error ) {
console .log("连接失败" );
console .log(error);
stompClient.disconnect();
});
}
function hello ( ) {
stompClient.send("/app/hello" , {atytopic :"greetings" }, JSON .stringify({ 'message' : $('#message' ).val(),'username' :username }));
}
function message ( ) {
stompClient.send("/app/message" , {atytopic :"message" }, JSON .stringify({ 'message' : $('#message' ).val(),'username' :username }));
}
function disconnect ( ) {
if (stompClient != null ) {
stompClient.disconnect();
}
setConnected(false );
console .log("Disconnected" );
}
function showMessage (message ) {
var $response = $("#response" );
$response.append($("<p>" +message+"</p>" ));
}
</script >
</head >
<body >
<div >
<div >
<button id ="connect" onclick ="connect();" > Connect</button >
<button id ="connectAny" onclick ="connectAny();" > ConnectAny</button >
<button id ="disconnect" disabled ="disabled" onclick ="disconnect();" > Disconnect</button >
</div >
<div id ="conversationDiv" >
<label > username</label > <input type ="text" id ="username" value ="qq" /> <br />
<label > message</label > <input type ="text" id ="message" value ="1" />
<button id ="sendName" onclick ="hello();" > Send hello</button >
<button id ="sendMessage" onclick ="message();" > Send message</button >
<p id ="response" > </p >
</div >
</div >
</body >
</html >
js代码中建立连接时在header中传入了用户名stompClient.connect({username:username},....
,在mark1代码块中就能从StompHeaderAccessor
实例中获取到username。
在握手阶段验证并设置用户信息
修改registerStompEndpoints方法,添加拦截器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public void registerStompEndpoints (StompEndpointRegistry registry) {
StompWebSocketEndpointRegistration ser = registry.addEndpoint("/endpoint" ).setAllowedOrigins("*" );
ser.addInterceptors(new HandshakeInterceptor() {
@Override
public boolean beforeHandshake (ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
ServletServerHttpRequest req = (ServletServerHttpRequest) request;
String username = req.getServletRequest().getParameter("username" );
if (StringUtils.isBlank(username)){
return false ;
}
Principal user = new Principal() {
@Override
public String getName () {
return username;
}
};
attributes.put("user" ,user);
return true ;
}
@Override
public void afterHandshake (ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
});
ser.setHandshakeHandler(new DefaultHandshakeHandler(){
@Override
protected Principal determineUser (ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
return (Principal)attributes.get("user" );
}
});
ser.withSockJS();
}
在握手阶段,数据是http协议传输的,用户信息需要通过http传递,所以客户端连接方式需要改成var socket = new SockJS("/endpoint?username="+username);
。 如果beforeHandshake
方法返回false,连接会失败。sockj会转入ajax轮询模式,关闭轮询模式的方法暂时没找到。