WebSocket+RabbitMQ整合实现消息实时推送

/ java / 没有评论 / 72浏览

WebSocket+RabbitMQ整合实现消息实时推送

配置

maven导包

<!--WebSocket-->
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!--RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

# RabbitMQ 配置
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 开启消息发送确认
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual

编写RabbitMQ服务端

配置类


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author kite
 * @version 1.0
 * @date 2020/8/3 16:07
 */
@Configuration
public class RabbitConfig {

    //websocket 消息队列
    public static final String msg_queue = "msg_queue";

    //消息交换机
    public static final String msg_exchang = "msg_exchang";

    //消息路由键
    public static final String msg_routing_key = "msg_routing_key";

    /**
     * 消息队列
     * @return
     */
    @Bean
    public Queue msgQueue(){
        return new Queue(msg_queue);
    }

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(msg_exchang);
    }

    /**
     * 消息队列绑定消息交换机
     * @return
     */
    @Bean
    public Binding msgBinding(){
        return BindingBuilder.bind(msgQueue()).to(directExchange()).with(msg_routing_key);
    }
}

消息提供者


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @author kite
 * @version 1.0
 * @date 2020/8/3 16:08
 */
@Slf4j
@Component
public class RabbitProduct {

    @Resource
    private RabbitTemplate rabbitTemplate;

    //发送消息
    public void sendMSG(String msg){
        log.info("----------------发送消息--------------------" + msg);
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //把消息对象放入路由对应的队列当中去
        rabbitTemplate.convertAndSend(RabbitConfig.msg_exchang, RabbitConfig.msg_routing_key, msg, correlationId);
    }

}

消息消费者


import com.jianxun.rabbitmqwebsocket.websocket.WebSocket;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author kite
 * @version 1.0
 * @date 2020/8/3 16:09
 */
@Slf4j
@Component
public class RabbitConsumer {

    @RabbitListener(queues = RabbitConfig.msg_queue) //监听队列
    public void msgReceive(String content, Message message, Channel channel) throws IOException {
        log.info("----------------接收到消息--------------------" + content);
        //发送给WebSocket 由WebSocket推送给前端
        WebSocket.sendAll(content);
        // 确认消息已接收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

WebSocket配置


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author kite
 * @version 1.0
 * @date 2020/8/3 16:09
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

WebSocket


import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@ServerEndpoint("/webSocket/{userId}")
@Component
public class WebSocket {

	// 与某个客户端的连接会话,需要通过它来给客户端发送数据
	private Session session;
	private static Map<String, Session> sessionPool = new HashMap<String, Session>();
	private static Map<String, String> sessionIds = new HashMap<String, String>();

	/**
	 * 用户连接时触发
	 * @param session
	 * @param userId
	 * @throws Exception 
	 * @throws NumberFormatException 
	 */
	@OnOpen
	public void onOpen(Session session, @PathParam(value = "userId") String userId) {
		this.session = session;
		sessionPool.put(userId, session);
		sessionIds.put(session.getId(), userId);
		System.out.println("有新连接加入!id为:" + userId + ",当前在线人数为" + sessionPool.size());
	}

	/**
	 * 收到信息时触发
	 * @param message
	 */
	@OnMessage
	public void onMessage(String message) {
		System.out.println("当前发送人sessionid为" + session.getId() + "发送内容为" + message);
	}

	/**
	 * 连接关闭触发
	 */
	@OnClose
	public void onClose() {
		sessionPool.remove(sessionIds.get(session.getId()));
		sessionIds.remove(session.getId());
	}

	/**
	 * 发生错误时触发
	 * @param session
	 * @param error
	 */
	@OnError
	public void onError(Session session, Throwable error) {
		error.printStackTrace();
	}

	/**
	 * 信息发送的方法
	 * @param message
	 * @param userId
	 */
	public static void sendMessage(String message, String userId) {
		Session s = sessionPool.get(userId);
		if (s != null) {
			try {
				System.out.println("当前发送对象Id为" + userId + ",发送内容为" + message);
				s.getBasicRemote().sendText(message);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * 获取当前连接数
	 * @return
	 */
	public static int getOnlineNum() {
		return sessionPool.size();
	}

	/**
	 * 获取在线用户名以逗号隔开
	 * @return
	 */
	public static String getOnlineUsers() {
		StringBuffer users = new StringBuffer();
		for (String key : sessionIds.keySet()) {
			users.append(sessionIds.get(key) + ",");
		}
		return users.toString();
	}

	/**
	 * 信息群发
	 * @param msg
	 */
	public static void sendAll(String msg) {
		for (String key : sessionIds.keySet()) {
			sendMessage(msg, sessionIds.get(key));
		}
	}

	/**
	 * 多个人发送给指定的几个用户
	 * @param msg
	 * @param persons 用户s
	 */

	public static void SendMany(String msg, String[] persons) {
		for (String userid : persons) {
			sendMessage(msg, userid);
		}
	}

	/**
	 * 多个人发送给指定的几个用户
	 * @param msg
	 * @param persons 用户s
	 */

	public static void SendMany(String msg, List<String> persons) {
		for (String userid : persons) {
			sendMessage(msg, userid);
		}
	}

}

Controller


import com.jianxun.rabbitmqwebsocket.rabbitmq.RabbitProduct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * @author kite
 * @version 1.0
 * @date 2020/8/3 16:05
 */
@Controller
public class WebController {

    @Autowired
    RabbitProduct rabbitProduct;

    @RequestMapping("/{view}")
    public String html(@PathVariable("view")String view){
        return view;
    }

    @ResponseBody
    @RequestMapping("/send")
    public void send(String msg){
        rabbitProduct.sendMSG(msg);
    }
}

前端html


<script type="text/javascript">
    var websocket;

    // 首先判断是否 支持 WebSocket
    if('WebSocket' in window) {

        websocket = new WebSocket("ws://localhost:8080/webSocket/1");
    } else if('MozWebSocket' in window) {
        websocket = new MozWebSocket("ws://localhost:8080/webSocket/1");
    } else {
        websocket = new SockJS("ws://localhost:8080/webSocket/1");
    }

    // 打开连接时
    websocket.onopen = function(event) {
        console.log(" websocket.onopen  ");
    };

    // 收到消息时
    websocket.onmessage = function(event) {
        console.log("收到一条消息"+event.data);
        alert(event.data);
    };

    websocket.onerror = function(event) {
        console.log("  websocket.onerror  ");
    };

    websocket.onclose = function(event) {
        console.log("  websocket.onclose  ");
    };
</script>

demo

http://git.wingsky.net/kite/rabbitmq-websocket