Emit Events with sink
private final Sinks.Many<String> eventSink = Sinks.many().multicast().directBestEffort();
@GetMapping(value = "/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> sse(@PathVariable("id") String id) {
final AtomicLong counter = new AtomicLong(0);
return eventSink.asFlux()
.filter(e -> e.equals(id))
.map(e -> {
System.out.println(Thread.currentThread().getName());
return ServerSentEvent.builder(e)
.id(counter.incrementAndGet() + "")
.event(e.toLowerCase() + "-" +
new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss").format(new java.util.Date()))
.build();
});
}
@ResponseStatus(HttpStatus.OK)
@ResponseBody
@GetMapping(path = "/send/{id}", produces = MediaType.TEXT_PLAIN_VALUE)
public String sendSomething(@PathVariable("id") String id) {
this.eventSink.emitNext(id,
(signalType, emitResult) -> {
System.out.println("Some event is being not send to all subscribers. It will vanish...");
// returning false, to not retry emitting given data again.
return false;
}
);
return "Have a look at /sse endpoint (using \"curl http://localhost/sse\" for example), to see events in realtime.";
}
Comments
Post a Comment