TravelMessageHandler.java 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package edu.travel.ws.handle;
  2. import com.alibaba.fastjson.JSON;
  3. import edu.travel.adapter.service.commodity.ShopUserSessionAdapter;
  4. import edu.travel.remote.dto.MessageDto;
  5. import edu.travel.ws.factoryandstragy.MessageFactory;
  6. import edu.travel.ws.service.MessageService;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. import org.springframework.transaction.annotation.Propagation;
  11. import org.springframework.transaction.annotation.Transactional;
  12. import org.springframework.web.socket.CloseStatus;
  13. import org.springframework.web.socket.PongMessage;
  14. import org.springframework.web.socket.TextMessage;
  15. import org.springframework.web.socket.WebSocketSession;
  16. import org.springframework.web.socket.handler.AbstractWebSocketHandler;
  17. import java.io.IOException;
  18. import java.util.List;
  19. import java.util.Map;
  20. import java.util.concurrent.ConcurrentHashMap;
  21. @Component
  22. public class TravelMessageHandler extends AbstractWebSocketHandler {
  23. private static ConcurrentHashMap<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();
  24. private static MessageFactory messageFactory;
  25. @Autowired
  26. public void setDeviceListenerService(MessageFactory messageFactory) {
  27. TravelMessageHandler.messageFactory = messageFactory;
  28. }
  29. public static Map<String, WebSocketSession> getSessionMap() {
  30. return sessionMap;
  31. }
  32. /**
  33. * 连接时调用方法
  34. * @param session
  35. * @throws Exception
  36. */
  37. @Override
  38. public void afterConnectionEstablished(WebSocketSession session) throws Exception {
  39. Map<String, Object> attributes = session.getAttributes();
  40. }
  41. /**
  42. * 连接出错时调用
  43. * @param session
  44. * @param exception
  45. * @throws Exception
  46. */
  47. @Override
  48. public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
  49. exception.printStackTrace();
  50. Map<String, Object> attributes = session.getAttributes();
  51. }
  52. /**
  53. * 连接关闭调用的方法
  54. */
  55. @Override
  56. public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
  57. }
  58. /**
  59. * 心跳
  60. * @param session
  61. * @param message
  62. * @throws Exception
  63. */
  64. @Override
  65. protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
  66. session.sendMessage(message);
  67. }
  68. @Override
  69. @Transactional(propagation = Propagation.REQUIRED)
  70. public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
  71. String payload = message.getPayload();
  72. if (StringUtils.isBlank(payload)) {
  73. return;
  74. }
  75. MessageDto messageDto = JSON.parseObject(payload, MessageDto.class);
  76. MessageService handlerByCode = messageFactory.getHandlerByCode(messageDto.getCode());
  77. List<String> ids = handlerByCode.sendMessage(messageDto);
  78. //对方session
  79. for (String id : ids) {
  80. WebSocketSession toSession = sessionMap.get(id);
  81. if(toSession!=null){
  82. sendMessage(messageDto,toSession);
  83. }
  84. }
  85. }
  86. /**
  87. * 发送系统消息给所有人系统消息
  88. * @param message
  89. * @throws Exception
  90. */
  91. public void sendAllMessage(TextMessage message) throws Exception {
  92. sessionMap.forEach( (key,value) ->{
  93. if (value.isOpen()) {
  94. try {
  95. value.sendMessage(message);
  96. } catch (IOException e) {
  97. throw new RuntimeException(e);
  98. }
  99. }
  100. });
  101. }
  102. /**
  103. * 给单个人系统发送消息
  104. * @param message
  105. */
  106. public void sendMessage(MessageDto message, WebSocketSession session) throws Exception {
  107. if (session.isOpen()) {
  108. session.sendMessage(new TextMessage(message.getContent()));
  109. }
  110. }
  111. }