package edu.travel.ws.handle; import com.alibaba.fastjson.JSON; import edu.travel.adapter.service.commodity.ShopUserSessionAdapter; import edu.travel.remote.dto.MessageDto; import edu.travel.ws.factoryandstragy.MessageFactory; import edu.travel.ws.service.MessageService; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.PongMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Component public class TravelMessageHandler extends AbstractWebSocketHandler { private static ConcurrentHashMap sessionMap = new ConcurrentHashMap<>(); private static MessageFactory messageFactory; @Autowired public void setDeviceListenerService(MessageFactory messageFactory) { TravelMessageHandler.messageFactory = messageFactory; } public static Map getSessionMap() { return sessionMap; } /** * 连接时调用方法 * @param session * @throws Exception */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { Map attributes = session.getAttributes(); } /** * 连接出错时调用 * @param session * @param exception * @throws Exception */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { exception.printStackTrace(); Map attributes = session.getAttributes(); } /** * 连接关闭调用的方法 */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { } /** * 心跳 * @param session * @param message * @throws Exception */ @Override protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { session.sendMessage(message); } @Override @Transactional(propagation = Propagation.REQUIRED) public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { String payload = message.getPayload(); if (StringUtils.isBlank(payload)) { return; } MessageDto messageDto = JSON.parseObject(payload, MessageDto.class); MessageService handlerByCode = messageFactory.getHandlerByCode(messageDto.getCode()); List ids = handlerByCode.sendMessage(messageDto); //对方session for (String id : ids) { WebSocketSession toSession = sessionMap.get(id); if(toSession!=null){ sendMessage(messageDto,toSession); } } } /** * 发送系统消息给所有人系统消息 * @param message * @throws Exception */ public void sendAllMessage(TextMessage message) throws Exception { sessionMap.forEach( (key,value) ->{ if (value.isOpen()) { try { value.sendMessage(message); } catch (IOException e) { throw new RuntimeException(e); } } }); } /** * 给单个人系统发送消息 * @param message */ public void sendMessage(MessageDto message, WebSocketSession session) throws Exception { if (session.isOpen()) { session.sendMessage(new TextMessage(message.getContent())); } } }