-
-
Save atinux/05836469acca9649fa2b9e865df898a2 to your computer and use it in GitHub Desktop.
// ~/server/api/sse.ts | |
export default defineEventHandler(async (event) => { | |
if (!process.dev) return { disabled: true } | |
// Enable SSE endpoint | |
setHeader(event, 'cache-control', 'no-cache') | |
setHeader(event, 'connection', 'keep-alive') | |
setHeader(event, 'content-type', 'text/event-stream') | |
setResponseStatus(event, 200) | |
let counter = 0 | |
const sendEvent = (data: any) => { | |
event.node.res.write(`id: ${++counter}\n`); | |
event.node.res.write(`data: ${JSON.stringify(data)}\n\n`); | |
} | |
myHooks.hook('sse:event', sendEvent) | |
// Let the connection opened | |
event._handled = true; | |
}) |
Exactly @hazadus
Thanks @atinux! 👍
@masyoudi Would you please open an issue in nitro repo with reproduction? 🙏 (planning to support SSE in standard layer but it might be tricky in cases like close event at the moment)
okay sir @pi0
hi sir @atinux, can you help me, i'm trying to listen
close
event. but didn't workevent.node.req.on('close', () => { /* do remove client */ });this is the complete implementation i was built : link
Hi @masyoudi, @pi0. After some digging, I think the issue is in 'httpxy' proxy used by nitro during dev server, and the responds did not get 'abort/destroy' when the client close the request, adding few lines of code in httpxy/src/middleware/web-incoming.ts
solve the issue, and the event.node.req.on('close', () => { });
event will be trigger.
which this issue is only happen during dev (dev server), where the 'close' event is already been trigger in build.
if (res.finished) {
if (server) {
server.emit("end", req, res, proxyRes);
}
} else {
+ res.on('close', () => {
+ proxyRes.destroy()
+ });
// Allow us to listen when the proxy has completed
proxyRes.on("end", function () {
if (server) {
server.emit("end", req, res, proxyRes);
}
});
// We pipe to the response unless its expected to be handled by the user
if (!options.selfHandleResponse) {
proxyRes.pipe(res);
}
}
I'll make a PR to unjs/httpxy if that looks good to you, or maybe I'm missing something (not very familiar with this proxy lib)
related discussion: nuxt/nuxt/discussions/16046
similar issue in upstream fork: http-party/node-http-proxy/issues/1520
Nice find @didavid61202 PR is more than welcome 🙏
I wanted to expand on this and provide a simple to use, drop in implementation for something like this:
import { H3Event } from "h3"
import { createHooks } from "hookable"
export interface ServerSentEvent {
[key: string]: <T, R>(data: T) => R | void
}
const sseHooks = createHooks<ServerSentEvent>()
export const useSSE = (event: H3Event, hookName: string) => {
setHeader(event, 'content-type', 'text/event-stream')
setHeader(event, 'cache-control', 'no-cache')
setHeader(event, 'connection', 'keep-alive')
setResponseStatus(event, 200)
let id = 0
sseHooks.hook(hookName, (data: any) => {
event.node.res.write(`id: ${id += 1}\n`)
event.node.res.write(`data: ${JSON.stringify(data)}\n\n`)
event.node.res.flushHeaders()
})
const send = (callback: (id: number) => any) => {
sseHooks.callHook(hookName, callback(id))
}
const close = () => {
event.node.res.end()
}
event._handled = true
event.node.req.on("close", close)
return { send, close }
}
Here is also a simple example of how you might use it:
export default defineEventHandler(async (event) => {
const { send, close } = useSSE(event, "sse:event")
let interval = setInterval(() => {
send((id) => ({ id, message: "Hello!"}))
}, 1000)
event.node.req.on("close", () => clearInterval(interval))
})
For a plain Nitro implementation using ReadableStream see:
i.e.
// file: src/server/event-stream.ts
import { IncomingMessage } from 'node:http';
import { ReadableStream } from 'node:stream/web';
import { TextEncoder } from 'node:util';
export type SourceController = {
send: (data: string, id?: string) => void;
close: () => void;
};
type InitSource = (controller: SourceController) => () => void;
function makeEventStream(request: IncomingMessage, init: InitSource) {
// listen to the request closing ASAP
let cleanup: (() => void) | undefined;
let closeStream: (() => void) | undefined;
let onClientClose = () => {
if (onClientClose) {
request.removeListener('close', onClientClose);
onClientClose = undefined;
}
closeStream?.();
};
request.addListener('close', onClientClose);
return new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
const send = (data: string, id?: string) => {
const payload = (id ? 'id:' + id + '\ndata:' : 'data:') + data + '\n\n';
controller.enqueue(encoder.encode(payload));
};
closeStream = () => {
if (!cleanup) return;
cleanup();
cleanup = undefined;
controller.close();
};
cleanup = init({ send, close: closeStream });
if (!onClientClose) {
// client closed request early
closeStream();
return;
}
},
});
}
export { makeEventStream };
A huge Thank YOU @atinux for this gist!
Official support:
Thanks @pi0, I wasn't aware that there is official support for this. One note though - shouldn't server-sent events have its own category instead of being placed under websockets#server-sent-events-sse
? SSE has nothing to do with websockets.
They are often interchangeable solutions and SSE is often a fallback approach when WebSocket usage is not possible. However consider SSE has some limitations too like opening multiple connections, might get browser request pending.
@pi0 The example seems to be missing on h3 page for SSE
@martinszeltins thanks for notice Should be good now (make sure to clear your browser cache). Also if you spotted any issues re docs or usage, feel free to directly open an issue in h3 repo.
Thanks @pi0! It would be nice to have an example in the Nitro docs too how to create a SSE route. There is an example for websockets route but not for SSE.
Suew we can add an example there too. Nitro usage is exactly same as h3 one BTW.
(please prefer to use GitHub issues and discussions for suggestions, I'm worried our comments here are off-topics and send will notifications to everyone following this gist 🙏 )
Regardless of the given example, I would like to ask question on,
How can I consume SSE on Nuxt client? Is there any simple API like this.$sse.on('tweet', (tweet) => this.tweets.unshift(tweet))
mentioned here.
Also, there can be cases where we've to make a POST req that responds with SSE and it might include JSON and not just plain string.
Regardless of the given example, I would like to ask question on,
How can I consume SSE on Nuxt client? Is there any simple API like
this.$sse.on('tweet', (tweet) => this.tweets.unshift(tweet))
mentioned here.Also, there can be cases where we've to make a POST req that responds with SSE and it might include JSON and not just plain string.
@jd-solanki you can use the useEventSource
from vueuse library - https://vueuse.org/core/useEventSource/
Yes, I can but it doesn't support POST request. I'm making POST request and in response, I'm getting SSE response.
Yes, I can but it doesn't support POST request. I'm making POST request and in response, I'm getting SSE response.
i think you need event emitter
@masyoudi Sorry I didn't get what you mean? What's Event Emitter related to SSE? Any example you can provide?
Yes, I can but it doesn't support POST request. I'm making POST request and in response, I'm getting SSE response.
@jd-solanki I think we're spamming this gist with notifications. But to answer your question, you do not make POST requests to SSE endpoint. SSE is for receiving events from the server. If you want to send data to the backend, you would use a different endpoint (graphql, rest etc.). And then whenever the backend has something new to say, it will stream a message to the client using the established SSE/EventSource connection. Feel free to provide a code example of what you're trying to do.
there is no relation between SSE with Event Emitter
as you can see SSE only aplicable on request method GET, so when you need to trigger the EventSource from request method POST, you need different approach
so in my opinion, i need to create function outside SSE route that can be trigger the EventSource, in this case i can use Event Emitter / hookable, and then inside POST route i can trigger the Event Emitter
i have a simple example in here
As @martinszeltins Says we shouldn't discuss it here. Let's not spam the gist here. Thanks for informing 🤝🏻
If anyone has something to say, I've seperate issue here.
Hey, @atinux I can't seem to get it to work on Cloudflare. Sending events from setInterval
works fine but sending from hooks doesn't seem to work. Am I missing something?
Why would one use this and not h3's createEventStream
? See h3 docs: https://h3.unjs.io/guide/websocket#server-sent-events-sse.
Example modified for Nuxt:
export default defineEventHandler(async (event) => {
const eventStream = createEventStream(event);
// Send a message every second
const interval = setInterval(async () => {
await eventStream.push('Hello world');
}, 1000);
// Cleanup the interval when the connection is terminated or the writer is closed
eventStream.onClosed(() => {
clearInterval(interval);
});
return eventStream.send();
});
Why would one use this
This topic started back in 2023-04-10 and createEventStream
was only added months later to h3 (unjs/h3#586)
Thanks for the clarification @peerreynders!
Thanks for your prompt answer @atinux!
So, if I want to send data via SSE from my code, I need to call
myHooks.callHook("sse:event", { message: "Hello!" })
?