Real-Time Patterns
Baander provides two real-time transport mechanisms: WebSocket for bidirectional low-latency communication (party sync, room messaging), and Server-Sent Events (SSE) for unidirectional server-to-client streams (notifications, job monitoring). Both run on top of Swoole and share a common authentication approach.
WebSocket
WebSocket connections are managed through Swoole's process mode. The WebSocketController extends AbstractWebSocketController from SwooleBundle and handles the full connection lifecycle: open, message, close.
Connection Registry
WebSocketConnectionRegistry uses Swoole's shared-memory Table to track connections across all worker processes. Three tables are maintained:
| Table | Purpose | Key format |
|---|---|---|
connections |
Maps file descriptor to user identity and worker ID | {fd} |
roomMembers |
Maps room membership (for broadcasting) | {room}\0{fd} |
fdRooms |
Reverse index: which rooms a connection belongs to | {fd}\0{room} |
Each user is limited to 10 concurrent WebSocket connections. Orphaned entries (connections whose underlying TCP socket has closed) are cleaned up by cleanupOrphans(), which calls Swoole\Server::isEstablished() to verify the connection is still alive.
Message Pushing
WebSocketPusher provides three sending strategies:
| Method | Target | Use case |
|---|---|---|
pushToConnection(fd, payload) |
Single connection by file descriptor | Replies, error messages |
push(userId, payload) |
All connections for a user | User-scoped notifications |
broadcast(room, payload) |
All members of a room | Party events, room broadcasts |
All payloads are JSON-encoded before sending. Failed pushes (disconnected clients) are logged and silently skipped.
WebSocket Message Protocol
The controller dispatches messages by type field:
| Message type | Direction | Description |
|---|---|---|
connected |
Server to client | Sent on successful handshake, includes reconnectToken |
auth.reconnect |
Client to server | Restore identity with a previously issued token |
ping / pong |
Both | Keep-alive |
room.join / room.leave |
Client to server | Join or leave a named room |
party.join / party.leave |
Client to server | Join or leave a watch-party session |
party.playback |
Client to server | Play, pause, or seek (host only) |
party.sync |
Client to server | Report client position for drift correction |
party.sync_response |
Server to client | Server-adjusted position after sync |
party.member_event |
Server to room broadcast | Member joined or left |
error |
Server to client | Error response with message |
Rate limiting is enforced per connection: 30 messages per second. Exceeding this returns an error message and the excess messages are dropped.
Reconnection
On open, the server issues a reconnectToken (via ReconnectionTokenService). If the connection drops, the client can reconnect and send auth.reconnect with the token to restore its identity without re-authenticating through the full OAuth handshake. A new token is issued on each successful reconnection.
SSE (Server-Sent Events)
SSE provides unidirectional server-to-client streaming over HTTP. Baander uses SSE for two endpoints: job monitoring and notifications.
SSE Endpoints
| Endpoint | Authentication | Channel | Source |
|---|---|---|---|
/api/sse/events |
Admin only, ?token= |
Polls JobMonitorService |
Job status changes |
/api/notifications/sse |
Any user, ?token= |
Redis Pub/Sub notification:{userId} |
NotificationSseController |
Both endpoints enforce a maximum of 5 concurrent connections per user (tracked via Redis key with a 120-second TTL). Exceeding the limit returns HTTP 429.
Event Flow: Notifications
The notification SSE endpoint uses Redis Pub/Sub for true push delivery:
Notification publisher
|
v
Redis PUBLISH notification:{userId}
|
v
RedisPubSubConnection (blocking subscribe)
|
v
sendSseEvent() -> client
RedisPubSubConnection manages a dedicated Redis connection for subscribing (Redis does not allow publish and subscribe on the same connection). Messages are JSON-decoded and forwarded to the client as SSE events.
On connect, the client can pass a Last-Event-ID header. The server queries the database for any notifications with IDs greater than the provided value and replays them (up to 50) before subscribing to the live stream.
Event Flow: Job Monitoring
The job monitoring endpoint uses a polling loop rather than Pub/Sub:
- On connect, sends a
connectedevent with current job counts by status. - Every 3 seconds, polls
JobMonitorService::getRecent()for updated jobs. - Deduplicates by comparing
updatedAttimestamps against the last emitted event. - Sends heartbeats every 30 seconds to keep the connection alive.
SSE Event Format
All SSE events follow the standard format:
id: {event-id}
event: {event-type}
data: {json-payload}
Event types for job monitoring:
| Event | Trigger |
|---|---|
connected |
Initial connection with job counts |
job.queued |
Job moved to queued status |
job.started |
Job started running |
job.progress |
Job progress update |
job.completed |
Job finished successfully |
job.failed |
Job failed |
job.cancelled |
Job cancelled |
heartbeat |
Keep-alive (every 30s) |
Event types for notifications:
| Event | Trigger |
|---|---|
notification |
New or replayed notification |
heartbeat |
Keep-alive (every 30s) |
Party Sync Protocol
The Party context implements synchronized watch-party playback over WebSocket. The SyncedPartySession aggregate root manages playback state, and PlaybackSynchronizer handles drift correction between participants.
Playback State Model
| Field | Type | Description |
|---|---|---|
playbackState |
PlaybackState enum |
playing, paused, stopped |
wallClockPosition |
float |
Position (seconds) when playback started |
playbackStartedAt |
?DateTimeImmutable |
Timestamp when playback was last started or resumed |
pausedAtPosition |
?float |
Captured position when paused |
The current playback position is computed dynamically by getCurrentPosition():
currentPosition = wallClockPosition + (now - playbackStartedAt)
This wall-clock approach means the server does not need to store an incrementing counter. As long as the server clock is consistent, the position is always correct relative to the start time.
Playback Actions
Host-initiated actions are dispatched as CQRS commands via Messenger:
| Command | Handler | Effect |
|---|---|---|
StartPlaybackCommand |
StartPlaybackHandler |
Sets state to Playing, resets playbackStartedAt |
PausePlaybackCommand |
PausePlaybackHandler |
Captures current position into pausedAtPosition, sets Paused |
SeekPlaybackCommand |
SeekPlaybackHandler |
Sets wallClockPosition to target, resets start timestamp |
All three handlers dispatch a PlaybackPositionChanged domain event via EventDispatcherInterface, which the broader system can react to.
Sync Protocol
Non-host participants periodically report their local playback position to the server. The sync flow:
- Client sends
party.syncwith its current position and measured latency. SyncPlaybackHandlercallsSyncedPartySession::syncPlayback()on the aggregate root.- The aggregate root computes the server's current position via
getCurrentPosition(). - If drift exceeds
clientLatency + 1.0seconds, the server position is returned for correction. - If drift is within tolerance, the server position is returned as the authoritative position.
PlaybackSynchronizer(called from infrastructure) additionally updates per-member jitter compensation.
Jitter Compensation
Each PartyMember tracks a smoothed jitter value using exponential moving average (EMA):
EMA_ALPHA = 0.3
MAX_JITTER = 2.0
drift = |serverPosition - clientPosition|
jitter = min(drift, MAX_JITTER)
smoothedJitter = EMA_ALPHA * jitter + (1 - EMA_ALPHA) * previousJitter
The alpha of 0.3 gives more weight to recent measurements while smoothing out transient spikes. Jitter is capped at 2.0 seconds to prevent runaway values from a single large correction.
Seek Handling
When the host seeks, the flow is:
- Host sends
party.playbackwithaction: "seek"and apositionvalue. SeekPlaybackHandlercallssession.seekTo(position)on the aggregate root.- If playing,
wallClockPositionis set to the new position andplaybackStartedAtis reset tonow(). If paused,pausedAtPositionis updated instead. - A
PlaybackPositionChangedevent is dispatched. - All participants receive the update and seek their local player to the new position.
Authentication
Both WebSocket and SSE use OAuth 2.0 token authentication via query parameter. This is necessary because neither the browser EventSource API (SSE) nor the WebSocket handshake (in some client configurations) supports custom headers.
WsQueryTokenAuthenticator
Invoked during Swoole's onHandshake callback -- this is not a Symfony firewall authenticator. It builds a minimal Symfony Request from the Swoole request, injects the token as a Bearer header, and validates through the League OAuth2 ResourceServer. Returns the authenticated user's UUID string, or null on failure (which rejects the handshake).
SseQueryTokenAuthenticator
A Symfony firewall authenticator (AbstractAuthenticator) that matches requests to /api/sse/** with a token query parameter. It uses the same OAuth2 ResourceServer validation path as the standard header-based authenticator, extracting the user identifier and loading the SecurityUser from the repository. Returns a SelfValidatingPassport on success, or a JSON 401 on failure.
Both authenticators share the same underlying OAuth 2.0 validation pipeline. The only difference is the entry point: WebSocket authenticates at the Swoole handshake level (before Symfony), while SSE authenticates through the standard Symfony security firewall.
See Shared Kernel for Redis connection management and Swoole async primitives. See Architecture for the bounded context overview.