Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions fastapi/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ def _check_data_exclusive(self) -> "ServerSentEvent":
return self


def _split_sse_lines(value: str) -> list[str]:
# Split on SSE-spec line terminators only (\n, \r\n, \r), preserving
# trailing empty strings.
return value.replace("\r\n", "\n").replace("\r", "\n").split("\n")


def format_sse_event(
*,
data_str: Annotated[
Expand Down Expand Up @@ -206,14 +212,14 @@ def format_sse_event(
lines: list[str] = []

if comment is not None:
for line in comment.splitlines():
for line in _split_sse_lines(comment):
lines.append(f": {line}")

if event is not None:
lines.append(f"event: {event}")

if data_str is not None:
for line in data_str.splitlines():
for line in _split_sse_lines(data_str):
lines.append(f"data: {line}")

if id is not None:
Expand Down
29 changes: 28 additions & 1 deletion tests/test_sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest
from fastapi import APIRouter, FastAPI
from fastapi.responses import EventSourceResponse
from fastapi.sse import ServerSentEvent
from fastapi.sse import ServerSentEvent, format_sse_event
from fastapi.testclient import TestClient
from pydantic import BaseModel

Expand Down Expand Up @@ -325,3 +325,30 @@ def test_no_keepalive_when_fast(client: TestClient):
assert response.status_code == 200
# KEEPALIVE_COMMENT is ": ping\n\n".
assert ": ping\n" not in response.text


@pytest.mark.parametrize(
("data", "expected_result"),
[
("Hello\n", b"data: Hello\ndata: \n\n"),
("Hello\n\n", b"data: Hello\ndata: \ndata: \n\n"),
("\n", b"data: \ndata: \n\n"),
("Hello\r\nWorld", b"data: Hello\ndata: World\n\n"),
("Hello\rWorld", b"data: Hello\ndata: World\n\n"),
("A\u2028B", "data: A\u2028B\n\n".encode()),
("A\vB", b"data: A\x0bB\n\n"),
],
)
def test_format_sse_event_splitlines_behavior_in_data(
data: str, expected_result: bytes
) -> None:
assert format_sse_event(data_str=data) == expected_result


def test_format_sse_event_splitlines_behavior_in_comment():
assert format_sse_event(comment="hi\n") == b": hi\n: \n\n"


def test_format_sse_event_keeps_empty_data_line():
payload = format_sse_event(data_str="")
assert payload == b"data: \n\n"
Loading