123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- package edu.travel.ws.handle;
- import com.alibaba.fastjson.JSON;
- import edu.travel.adapter.service.commodity.ShopUserSessionAdapter;
- import edu.travel.remote.dto.MessageDto;
- import edu.travel.ws.factoryandstragy.MessageFactory;
- import edu.travel.ws.service.MessageService;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.springframework.transaction.annotation.Propagation;
- import org.springframework.transaction.annotation.Transactional;
- import org.springframework.web.socket.CloseStatus;
- import org.springframework.web.socket.PongMessage;
- import org.springframework.web.socket.TextMessage;
- import org.springframework.web.socket.WebSocketSession;
- import org.springframework.web.socket.handler.AbstractWebSocketHandler;
- import java.io.IOException;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- @Component
- public class TravelMessageHandler extends AbstractWebSocketHandler {
- private static ConcurrentHashMap<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();
- private static MessageFactory messageFactory;
- @Autowired
- public void setDeviceListenerService(MessageFactory messageFactory) {
- TravelMessageHandler.messageFactory = messageFactory;
- }
- public static Map<String, WebSocketSession> getSessionMap() {
- return sessionMap;
- }
- /**
- * 连接时调用方法
- * @param session
- * @throws Exception
- */
- @Override
- public void afterConnectionEstablished(WebSocketSession session) throws Exception {
- Map<String, Object> attributes = session.getAttributes();
- }
- /**
- * 连接出错时调用
- * @param session
- * @param exception
- * @throws Exception
- */
- @Override
- public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
- exception.printStackTrace();
- Map<String, Object> attributes = session.getAttributes();
- }
- /**
- * 连接关闭调用的方法
- */
- @Override
- public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
- }
- /**
- * 心跳
- * @param session
- * @param message
- * @throws Exception
- */
- @Override
- protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
- session.sendMessage(message);
- }
- @Override
- @Transactional(propagation = Propagation.REQUIRED)
- public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
- String payload = message.getPayload();
- if (StringUtils.isBlank(payload)) {
- return;
- }
- MessageDto messageDto = JSON.parseObject(payload, MessageDto.class);
- MessageService handlerByCode = messageFactory.getHandlerByCode(messageDto.getCode());
- List<String> ids = handlerByCode.sendMessage(messageDto);
- //对方session
- for (String id : ids) {
- WebSocketSession toSession = sessionMap.get(id);
- if(toSession!=null){
- sendMessage(messageDto,toSession);
- }
- }
- }
- /**
- * 发送系统消息给所有人系统消息
- * @param message
- * @throws Exception
- */
- public void sendAllMessage(TextMessage message) throws Exception {
- sessionMap.forEach( (key,value) ->{
- if (value.isOpen()) {
- try {
- value.sendMessage(message);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
- /**
- * 给单个人系统发送消息
- * @param message
- */
- public void sendMessage(MessageDto message, WebSocketSession session) throws Exception {
- if (session.isOpen()) {
- session.sendMessage(new TextMessage(message.getContent()));
- }
- }
- }
|