SSE

SSE

项目中遇到竞价功能,用户停留在竞价页面,服务端需要实时把竞价信息推送给每一个客户

什么是SSE

SSE是一种服务器推送技术,允许服务器实时向客户端发送数据流。它是一种轻量级的单向通信机制,特别适合于实时性要求高的场景,如实时更新日志、实时新闻推送等。在Java中,SSE的实现依赖于Servlet 3.0及以上版本,通过使用Java的特定库和框架,可以轻松地向客户端推送实时数据。

业务上实现单向通信有以下两种:

特性 Spring WebFlux + SSE Spring MVC + SseEmitter
并发能力 高(单线程支持数千连接) 低(受限于线程池大小)
线程占用 无独占线程(EventLoop共享) 每个推送占用线程池线程
代码复杂度 简单(内置广播机制) 复杂(需手动管理连接和线程)
适用场景 新项目/高并发需求 旧项目/低并发需求

Spring WebFlux + SSE 简单案例

1.接收器管理类

实际业务中,竞价不可能只是单一的一个物品竞价,可能同时存在多个物品竞价

ConcurrentMap<String, Sinks.Many > dynamicSinks

一个物品竞价map就添加一条

业务结束(某一个竞价结束)调用closeSink

package com.dem.framework.config.sse;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @program: dem-template-pc
* @description: 接收器管理类
* @author: lcb
* @create: 2025-04-01
**/
@Component
public class DynamicSinkManager {
private final ConcurrentMap<String, Sinks.Many<String>> dynamicSinks = new ConcurrentHashMap<>();
/**
* 根据sinkKey获取Sink,如果不存在则创建一个
* @param sinkKey Sink的唯一key
* @author lcb
* @date 2025/4/1
* @return
**/
public Sinks.Many<String> getOrCreateSink(String sinkKey) {
return dynamicSinks.computeIfAbsent(sinkKey,
k -> Sinks.many().multicast().directBestEffort()
);
}
/**
* 关闭指定Sink
* @param sinkKey
*
* @author lcb
* @date 2025/4/1
* @return
*
**/
public void closeSink(String sinkKey) {
Sinks.Many<String> sink = dynamicSinks.remove(sinkKey);
if (sink != null) {
sink.tryEmitComplete();
}
}
public Flux<String> getFlux(String sinkKey) {
return getOrCreateSink(sinkKey).asFlux();
}
}
package com.dem.framework.config.sse;

import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * @program: dem-template-pc
 * @description: 接收器管理类
 * @author: lcb
 * @create: 2025-04-01
 **/
@Component
public class DynamicSinkManager {
    private final ConcurrentMap<String, Sinks.Many<String>> dynamicSinks = new ConcurrentHashMap<>();

    /**
     * 根据sinkKey获取Sink,如果不存在则创建一个
     * @param sinkKey Sink的唯一key
     * @author lcb
     * @date 2025/4/1
     * @return
     **/
    public Sinks.Many<String> getOrCreateSink(String sinkKey) {
        return dynamicSinks.computeIfAbsent(sinkKey,
                k -> Sinks.many().multicast().directBestEffort()
        );
    }

    /**
     * 关闭指定Sink
     * @param sinkKey
     *
     * @author lcb
     * @date 2025/4/1
     * @return
     *
     **/
    public void closeSink(String sinkKey) {
        Sinks.Many<String> sink = dynamicSinks.remove(sinkKey);
        if (sink != null) {
            sink.tryEmitComplete();
        }
    }

    public Flux<String> getFlux(String sinkKey) {
        return getOrCreateSink(sinkKey).asFlux();
    }

}
package com.dem.framework.config.sse; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * @program: dem-template-pc * @description: 接收器管理类 * @author: lcb * @create: 2025-04-01 **/ @Component public class DynamicSinkManager { private final ConcurrentMap<String, Sinks.Many<String>> dynamicSinks = new ConcurrentHashMap<>(); /** * 根据sinkKey获取Sink,如果不存在则创建一个 * @param sinkKey Sink的唯一key * @author lcb * @date 2025/4/1 * @return **/ public Sinks.Many<String> getOrCreateSink(String sinkKey) { return dynamicSinks.computeIfAbsent(sinkKey, k -> Sinks.many().multicast().directBestEffort() ); } /** * 关闭指定Sink * @param sinkKey * * @author lcb * @date 2025/4/1 * @return * **/ public void closeSink(String sinkKey) { Sinks.Many<String> sink = dynamicSinks.remove(sinkKey); if (sink != null) { sink.tryEmitComplete(); } } public Flux<String> getFlux(String sinkKey) { return getOrCreateSink(sinkKey).asFlux(); } }

2.消息生产者

package com.dem.framework.config.sse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.List;
/**
* @program: dem-template-pc
* @description: sse 消息生产者
* @author: lcb
* @create: 2025-04-01
**/
@Component
@Slf4j
public class MessageProducer {
@Autowired
private DynamicSinkManager dynamicSinkManager;
/**
* 订阅 广播模式
* @param sinkKey 订阅一个
*
* @author lcb
* @date 2025/4/1 15:26
* @return
**/
public Flux<String> asFluxShare(String sinkKey) {
return dynamicSinkManager.getOrCreateSink(sinkKey).asFlux().share();
}
//
public void sendMessage(String sinkKey,String message) {
Sinks.EmitResult result = dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitNext(message);
// 处理发送结果
if (result.isFailure()) {
// 根据业务需求处理失败情况
handleEmitFailure(result, message);
}
}
// 批量发送
public void sendBatch(String sinkKey,List<String> messages) {
messages.forEach(msg->sendMessage(sinkKey, msg));
}
// 关闭消息流
public void shutdown(String sinkKey) {
dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitComplete();
}
private void handleEmitFailure(Sinks.EmitResult result, String message) {
// 实现你的失败处理逻辑
log.error("消息发送失败,原因: {}", result);
}
}
package com.dem.framework.config.sse;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.List;

/**
 * @program: dem-template-pc
 * @description: sse 消息生产者
 * @author: lcb
 * @create: 2025-04-01
 **/
@Component
@Slf4j
public class MessageProducer {
    @Autowired
    private DynamicSinkManager dynamicSinkManager;

    /**
     * 订阅 广播模式
     * @param sinkKey 订阅一个
     *
     * @author lcb
     * @date 2025/4/1 15:26
     * @return
     **/
    public Flux<String> asFluxShare(String sinkKey) {
        return dynamicSinkManager.getOrCreateSink(sinkKey).asFlux().share();
    }

    //
    public void sendMessage(String sinkKey,String message) {
        Sinks.EmitResult result = dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitNext(message);
        // 处理发送结果
        if (result.isFailure()) {
            // 根据业务需求处理失败情况
            handleEmitFailure(result, message);
        }
    }

    // 批量发送
    public void sendBatch(String sinkKey,List<String> messages) {
        messages.forEach(msg->sendMessage(sinkKey, msg));
    }

    // 关闭消息流
    public void shutdown(String sinkKey) {
        dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitComplete();
    }

    private void handleEmitFailure(Sinks.EmitResult result, String message) {
        // 实现你的失败处理逻辑
        log.error("消息发送失败,原因: {}", result);
    }
}
package com.dem.framework.config.sse; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.List; /** * @program: dem-template-pc * @description: sse 消息生产者 * @author: lcb * @create: 2025-04-01 **/ @Component @Slf4j public class MessageProducer { @Autowired private DynamicSinkManager dynamicSinkManager; /** * 订阅 广播模式 * @param sinkKey 订阅一个 * * @author lcb * @date 2025/4/1 15:26 * @return **/ public Flux<String> asFluxShare(String sinkKey) { return dynamicSinkManager.getOrCreateSink(sinkKey).asFlux().share(); } // public void sendMessage(String sinkKey,String message) { Sinks.EmitResult result = dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitNext(message); // 处理发送结果 if (result.isFailure()) { // 根据业务需求处理失败情况 handleEmitFailure(result, message); } } // 批量发送 public void sendBatch(String sinkKey,List<String> messages) { messages.forEach(msg->sendMessage(sinkKey, msg)); } // 关闭消息流 public void shutdown(String sinkKey) { dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitComplete(); } private void handleEmitFailure(Sinks.EmitResult result, String message) { // 实现你的失败处理逻辑 log.error("消息发送失败,原因: {}", result); } }

3.后端测试入口

package com.dem.web.controller.business;
import com.dem.common.annotation.Anonymous;
import com.dem.framework.config.sse.MessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
/**
* @program: dem-template-pc
* @description: 测试
* @author: lcb
* @create: 2025-03-28 10:21
**/
@RestController
@RequestMapping("/sse")
@Slf4j
public class SSEController {
@Autowired
private MessageProducer messageProducer;
/**
* 与客户端建立链接
*
* @param sinkKey
* @return
* @author lcb
* @date 2025/4/1
**/
@GetMapping(value = "/linkEvents/{sinkKey}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> linkEvents(@PathVariable String sinkKey) {
return messageProducer.asFluxShare(sinkKey);
}
/**
* 模拟添加数据
*
* @param sinkKey
* @param str
* @return
* @author lcb
* @date 2025/4/1 15:50
**/
@GetMapping("/addData/{sinkKey}/{str}")
public void addData(@PathVariable("sinkKey") String sinkKey, @PathVariable("str") String str) {
/*
* 处理业务。。。
* */
log.info("数据:{}", str);
messageProducer.sendMessage(sinkKey, str);
}
/**
* 模拟竞价结束
*
* @param sinkKey
* @return
* @author lcb
* @date 2025/4/1 15:50
**/
@GetMapping("/shutdown/{sinkKey}")
public void guanb(@PathVariable String sinkKey) {
/*
* 竞价结束
* */
messageProducer.shutdown(sinkKey);
}
}
package com.dem.web.controller.business;

import com.dem.common.annotation.Anonymous;
import com.dem.framework.config.sse.MessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;


/**
 * @program: dem-template-pc
 * @description: 测试
 * @author: lcb
 * @create: 2025-03-28 10:21
 **/

@RestController
@RequestMapping("/sse")
@Slf4j
public class SSEController {
    @Autowired
    private MessageProducer messageProducer;

    /**
     * 与客户端建立链接
     *
     * @param sinkKey
     * @return
     * @author lcb
     * @date 2025/4/1
     **/
    @GetMapping(value = "/linkEvents/{sinkKey}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> linkEvents(@PathVariable String sinkKey) {
        return messageProducer.asFluxShare(sinkKey);
    }

    /**
     * 模拟添加数据
     *
     * @param sinkKey
     * @param str
     * @return
     * @author lcb
     * @date 2025/4/1 15:50
     **/
    @GetMapping("/addData/{sinkKey}/{str}")
    public void addData(@PathVariable("sinkKey") String sinkKey, @PathVariable("str") String str) {
        /*
         * 处理业务。。。
         * */
        log.info("数据:{}", str);
        messageProducer.sendMessage(sinkKey, str);
    }

    /**
     * 模拟竞价结束
     *
     * @param sinkKey
     * @return
     * @author lcb
     * @date 2025/4/1 15:50
     **/
    @GetMapping("/shutdown/{sinkKey}")
    public void guanb(@PathVariable String sinkKey) {
        /*
         * 竞价结束
         * */
        messageProducer.shutdown(sinkKey);
    }
}
package com.dem.web.controller.business; import com.dem.common.annotation.Anonymous; import com.dem.framework.config.sse.MessageProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; /** * @program: dem-template-pc * @description: 测试 * @author: lcb * @create: 2025-03-28 10:21 **/ @RestController @RequestMapping("/sse") @Slf4j public class SSEController { @Autowired private MessageProducer messageProducer; /** * 与客户端建立链接 * * @param sinkKey * @return * @author lcb * @date 2025/4/1 **/ @GetMapping(value = "/linkEvents/{sinkKey}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> linkEvents(@PathVariable String sinkKey) { return messageProducer.asFluxShare(sinkKey); } /** * 模拟添加数据 * * @param sinkKey * @param str * @return * @author lcb * @date 2025/4/1 15:50 **/ @GetMapping("/addData/{sinkKey}/{str}") public void addData(@PathVariable("sinkKey") String sinkKey, @PathVariable("str") String str) { /* * 处理业务。。。 * */ log.info("数据:{}", str); messageProducer.sendMessage(sinkKey, str); } /** * 模拟竞价结束 * * @param sinkKey * @return * @author lcb * @date 2025/4/1 15:50 **/ @GetMapping("/shutdown/{sinkKey}") public void guanb(@PathVariable String sinkKey) { /* * 竞价结束 * */ messageProducer.shutdown(sinkKey); } }

4.测试

  • 前端代码

    const eventSource = new EventSource('http://localhost:8080/dem-vue-template-api/sse/linkEvents/dem123');
    eventSource.onmessage = function(event) {
    console.log('Received:', event.data);
    // 在这里处理接收到的数据,比如更新UI
    };
    const eventSource = new EventSource('http://localhost:8080/dem-vue-template-api/sse/linkEvents/dem123');
    eventSource.onmessage = function(event) {
      console.log('Received:', event.data);
      // 在这里处理接收到的数据,比如更新UI
    };
    const eventSource = new EventSource('http://localhost:8080/dem-vue-template-api/sse/linkEvents/dem123'); eventSource.onmessage = function(event) { console.log('Received:', event.data); // 在这里处理接收到的数据,比如更新UI };
  • 也可使用浏览器直接请求

其他参数

  1. Sinks.many():

    • 创建一个可以发射多个元素的 Sink (接收器)
    • 与 Sinks.one()(单元素)和 Sinks.empty()(无元素)相对
  2. .multicast():

    • 指定这是一个多播 Sink
    • 意味着多个订阅者可以订阅同一个源
    • 与 .unicast()(单播)相对
  3. .directBestEffort():

    因为这个业务是 竞价,后来订阅的用户只需要获取最新的数据即可

    如果后来订阅的用户需要获取之前的历史数据可以参考

    .onBackpressureBuffer()

    .replay()

    • direct: 表示直接传递元素,不进行任何缓冲
    • bestEffort: 表示”尽力而为”的传递策略,如果下游跟不上,可能会丢弃元素

原文链接:SSE

© 版权声明
THE END
喜欢就支持一下吧
点赞7 分享
Happiness isn't about getting what you want all the time, it's about loving what you have.
幸福并不是一味得到自己想要的,而是珍爱自己拥有的
评论 抢沙发

请登录后发表评论

    暂无评论内容