Springboot Websocket Stomp 消息訂閱推送
閑話不扯,直奔主題。需要和web前端建立長鏈接,互相實時通訊,因此想到了websocket,后面隨著需求的變更,需要用戶訂閱主題,實現消息的精準推送,發布訂閱等,則想到了STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的簡單文本協議。
websocket協議想到了之前寫的一個websocket長鏈接的demo,也貼上代碼供大家參考。
pom文件直接引入spring-boot-starter-websocket即可。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId></dependency>
聲明websocket endpoint
import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;/** * @ClassName WebSocketConfig * @Author scott * @Date 2021/6/16 * @Version V1.0 **/@Configurationpublic class WebSocketConfig { /** * 注入一個ServerEndpointExporter,該Bean會自動注冊使用@ServerEndpoint注解申明的websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter(); }}
websocket實現類,其中通過注解監聽了各種事件,實現了推送消息等相關邏輯
import com.google.common.cache.Cache;import com.google.common.cache.CacheBuilder;import com.ruoyi.common.core.domain.AjaxResult;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.util.Objects;import java.util.Set;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/** * @ClassName: DataTypePushWebSocket * @Author: scott * @Date: 2021/6/16**/@ServerEndpoint(value = '/ws/dataType/push/{token}')@Componentpublic class DataTypePushWebSocket { private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class); /** * 記錄當前在線連接數 */ private static AtomicInteger onlineCount = new AtomicInteger(0); private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder() .initialCapacity(10) .maximumSize(300) .expireAfterWrite(10, TimeUnit.MINUTES) .build(); /** * 連接建立成功調用的方法 */ @OnOpen public void onOpen(Session session, @PathParam('token')String token) {String sessionId = session.getId();onlineCount.incrementAndGet(); // 在線數加1this.sendMessage('sessionId:' + sessionId +',已經和server建立連接', session);SESSION_CACHE.put(sessionId,session);log.info('有新連接加入:{},當前在線連接數為:{}', session.getId(), onlineCount.get()); } /** * 連接關閉調用的方法 */ @OnClose public void onClose(Session session,@PathParam('token')String token) {onlineCount.decrementAndGet(); // 在線數減1SESSION_CACHE.invalidate(session.getId());log.info('有一連接關閉:{},當前在線連接數為:{}', session.getId(), onlineCount.get()); } /** * 收到客戶端消息后調用的方法 * * @param message 客戶端發送過來的消息 */ @OnMessage public void onMessage(String message, Session session,@PathParam('token')String token) {log.info('服務端收到客戶端[{}]的消息:{}', session.getId(), message);this.sendMessage('服務端已收到推送消息:' + message, session); } @OnError public void onError(Session session, Throwable error) {log.error('發生錯誤');error.printStackTrace(); } /** * 服務端發送消息給客戶端 */ private static void sendMessage(String message, Session toSession) {try { log.info('服務端給客戶端[{}]發送消息{}', toSession.getId(), message); toSession.getBasicRemote().sendText(message);} catch (Exception e) { log.error('服務端發送消息給客戶端失?。簕}', e);} } public static AjaxResult sendMessage(String message, String sessionId){Session session = SESSION_CACHE.getIfPresent(sessionId);if(Objects.isNull(session)){ return AjaxResult.error('token已失效');}sendMessage(message,session);return AjaxResult.success(); } public static AjaxResult sendBroadcast(String message){long size = SESSION_CACHE.size();if(size <=0){ return AjaxResult.error('當前沒有在線客戶端,無法推送消息');}ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap();Set<String> keys = sessionConcurrentMap.keySet();for (String key : keys) { Session session = SESSION_CACHE.getIfPresent(key); DataTypePushWebSocket.sendMessage(message,session);}return AjaxResult.success(); }}
至此websocket服務端代碼已經完成。
stomp協議前端代碼.這個是在某個vue工程中寫的js,各位大佬自己動手改改即可。其中Settings.wsPath是后端定義的ws地址例如ws://localhost:9003/ws
import Stomp from ’stompjs’import Settings from ’@/settings.js’export default { // 是否啟用日志 默認啟用 debug:true, // 客戶端連接信息 stompClient:{}, // 初始化 init(callBack){ this.stompClient = Stomp.client(Settings.wsPath) this.stompClient.hasDebug = this.debug this.stompClient.connect({},suce =>{ this.console('連接成功,信息如下 ↓') this.console(this.stompClient) if(callBack){callBack() } },err => { if(err) {this.console('連接失敗,信息如下 ↓')this.console(err) } }) }, // 訂閱 sub(address,callBack){ if(!this.stompClient.connected){ this.console('沒有連接,無法訂閱') return } // 生成 id let timestamp= new Date().getTime() + address this.console('訂閱成功 -> '+address) this.stompClient.subscribe(address,message => { this.console(address+' 訂閱消息通知,信息如下 ↓') this.console(message) let data = message.body callBack(data) },{ id: timestamp }) }, unSub(address){ if(!this.stompClient.connected){ this.console('沒有連接,無法取消訂閱 -> '+address) return } let id = '' for(let item in this.stompClient.subscriptions){ if(item.endsWith(address)){id = itembreak } } this.stompClient.unsubscribe(id) this.console('取消訂閱成功 -> id:'+ id + ' address:'+address) }, // 斷開連接 disconnect(callBack){ if(!this.stompClient.connected){ this.console('沒有連接,無法斷開連接') return } this.stompClient.disconnect(() =>{ console.log('斷開成功') if(callBack){callBack() } }) }, // 單位 秒 reconnect(time){ setInterval(() =>{ if(!this.stompClient.connected){this.console('重新連接中...')this.init() } },time * 1000) }, console(msg){ if(this.debug){ console.log(msg) } }, // 向訂閱發送消息 send(address,msg) { this.stompClient.send(address,{},msg) }}
后端stomp config,里面都有注釋,寫的很詳細,并且我加入了和前端的心跳ping pong。
package com.cn.scott.config;import org.springframework.context.annotation.Configuration;import org.springframework.messaging.simp.config.MessageBrokerRegistry;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;import org.springframework.web.socket.config.annotation.StompEndpointRegistry;import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;/** * @ClassName: WebSocketStompConfig * @Author: scott * @Date: 2021/7/8**/@Configuration@EnableWebSocketMessageBrokerpublic class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { private static long HEART_BEAT=10000; @Override public void registerStompEndpoints(StompEndpointRegistry registry) {//允許使用socketJs方式訪問,訪問點為webSocket,允許跨域//在網頁上我們就可以通過這個鏈接//ws://127.0.0.1:port/ws來和服務器的WebSocket連接registry.addEndpoint('/ws').setAllowedOrigins('*'); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) {ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();te.setPoolSize(1);te.setThreadNamePrefix('wss-heartbeat-thread-');te.initialize();//基于內存的STOMP消息代理來代替mq的消息代理//訂閱Broker名稱,/user代表點對點即發指定用戶,/topic代表發布廣播即群發//setHeartbeatValue 設置心跳及心跳時間registry.enableSimpleBroker('/user', '/topic').setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);//點對點使用的訂閱前綴,不設置的話,默認也是/user/registry.setUserDestinationPrefix('/user/'); }}
后端stomp協議接受、訂閱等動作通知
package com.cn.scott.ws;import com.alibaba.fastjson.JSON;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.handler.annotation.DestinationVariable;import org.springframework.messaging.handler.annotation.MessageMapping;import org.springframework.messaging.simp.SimpMessagingTemplate;import org.springframework.messaging.simp.annotation.SubscribeMapping;import org.springframework.web.bind.annotation.RestController;/** * @ClassName StompSocketHandler * @Author scott * @Date 2021/6/30 * @Version V1.0 **/@RestControllerpublic class StompSocketHandler { @Autowired private SimpMessagingTemplate simpMessagingTemplate; /** * @MethodName: subscribeMapping * @Description: 訂閱成功通知 * @Param: [id] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ @SubscribeMapping('/user/{id}/listener') public void subscribeMapping(@DestinationVariable('id') final long id) {System.out.println('>>>>>>用戶:'+id +',已訂閱');SubscribeMsg param = new SubscribeMsg(id,String.format('用戶【%s】已訂閱成功', id));sendToUser(param); } /** * @MethodName: test * @Description: 接收訂閱topic消息 * @Param: [id, msg] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ @MessageMapping(value = '/user/{id}/listener') public void UserSubListener(@DestinationVariable long id, String msg) {System.out.println('收到客戶端:' +id+',的消息');SubscribeMsg param = new SubscribeMsg(id,String.format('已收到用戶【%s】發送消息【%s】', id,msg));sendToUser(param); } @GetMapping('/refresh/{userId}') public void refresh(@PathVariable Long userId, String msg) {StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format('服務端向用戶【%s】發送消息【%s】', userId,msg));sendToUser(param); } /** * @MethodName: sendToUser * @Description: 推送消息給訂閱用戶 * @Param: [userId] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ public void sendToUser(SubscribeMsg screenChangeMsg){//這里可以控制權限等。。。simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),'/listener', JSON.toJSONString(screenChangeMsg)); } /** * @MethodName: sendBroadCast * @Description: 發送廣播,需要用戶事先訂閱廣播 * @Param: [topic, msg] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ public void sendBroadCast(String topic,String msg){simpMessagingTemplate.convertAndSend(topic,msg); } /** * @ClassName: SubMsg * @Author: scott * @Date: 2021/6/30 **/ public static class SubscribeMsg {private Long userId;private String msg;public SubscribeMsg(Long UserId, String msg){ this.userId = UserId; this.msg = msg;}public Long getUserId() { return userId;}public String getMsg() { return msg;} }}
連接展示
建立連接成功,這里可以看出是基于websocket協議
連接信息
ping pong
調用接口向訂閱用戶1發送消息,http://localhost:9003/refresh/1?msg=HelloStomp,可以在客戶端控制臺查看已經收到了消息。這個時候不同用戶通過自己的userId可以區分訂閱的主題,可以做到通過userId精準的往客戶端推送消息。
還記得我們在后端配置的時候還指定了廣播的訂閱主題/topic,這時我們前端通過js只要訂閱了這個主題,那么后端在像這個主題推送消息時,所有訂閱的客戶端都能收到,感興趣的小伙伴可以自己試試,api我都寫好了。
至此,實戰完畢,喜歡的小伙伴麻煩關注加點贊。
springboot + stomp后端源碼地址:https://gitee.com/ErGouGeSiBaKe/stomp-server
到此這篇關于Springboot Websocket Stomp 消息訂閱推送的文章就介紹到這了,更多相關Springboot Websocket Stomp 消息訂閱推送內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!
相關文章: