Documentation Index
Fetch the complete documentation index at: https://cloud-docs.mentra.glass/llms.txt
Use this file to discover all available pages before exploring further.
Overview
VideoManager handles RTMP (Real-Time Messaging Protocol) streaming within a user session. It manages the lifecycle of video streams, tracks stream health through keep-alive mechanisms, handles acknowledgments, and ensures reliable communication between apps and smart glasses for video streaming.
File: packages/cloud/src/services/session/VideoManager.ts
Key Features
- Stream Lifecycle Management: Tracks streams from initialization to termination
- Keep-Alive Mechanism: Maintains stream health with periodic heartbeats
- ACK Tracking: Monitors acknowledgments to detect connection issues
- Conflict Management: Prevents conflicts with managed streaming sessions
- Multi-Stream Support: Handles multiple concurrent streams per session
- Status Broadcasting: Notifies apps of stream status changes
- Timeout Detection: Automatically cleans up stale streams
Architecture
Stream Management
interface SessionStreamInfo {
streamId: string;
packageName: string;
rtmpUrl: string;
status: 'initializing' | 'active' | 'stopping' | 'stopped' | 'timeout';
startTime: Date;
lastKeepAlive: Date;
keepAliveTimer?: NodeJS.Timeout;
pendingAcks: Map<string, { sentAt: Date; timeout: NodeJS.Timeout; }>;
missedAcks: number;
options: {
video?: VideoConfig;
audio?: AudioConfig;
stream?: StreamConfig;
};
}
Stream Creation
async startRtmpStream(request: RtmpStreamRequest): Promise<string> {
const { packageName, rtmpUrl, video, audio, stream: streamOptions } = request;
// Validation
if (!this.userSession.appManager.isAppRunning(packageName)) {
throw new Error(`App ${packageName} is not running`);
}
if (!rtmpUrl || (!rtmpUrl.startsWith('rtmp://') && !rtmpUrl.startsWith('rtmps://'))) {
throw new Error('Invalid RTMP URL');
}
if (!this.userSession.websocket?.readyState === WebSocket.OPEN) {
throw new Error('Glasses WebSocket not connected');
}
// Check for managed stream conflicts
if (this.userSession.managedStreamingExtension.checkUnmanagedStreamConflict(this.userSession.userId)) {
throw new Error('Cannot start unmanaged stream - managed stream already active');
}
// Generate short ID for BLE efficiency
const streamId = `s${Date.now().toString(36).slice(-6)}${Math.random().toString(36).slice(2, 6)}`;
// Stop existing streams for this app
this.stopStreamsByPackageName(packageName);
// Create stream info
const streamInfo: SessionStreamInfo = {
streamId,
packageName,
rtmpUrl,
status: 'initializing',
startTime: now,
lastKeepAlive: now,
pendingAcks: new Map(),
missedAcks: 0,
options: { video, audio, stream: streamOptions }
};
this.activeSessionStreams.set(streamId, streamInfo);
this.scheduleKeepAlive(streamId);
// Send start command
const startMessage: StartRtmpStream = {
type: CloudToGlassesMessageType.START_RTMP_STREAM,
sessionId: this.userSession.sessionId,
rtmpUrl,
appId: packageName,
streamId,
video: video || {},
audio: audio || {},
stream: streamOptions || {},
timestamp: now
};
this.userSession.websocket.send(JSON.stringify(startMessage));
return streamId;
}
Keep-Alive Mechanism
Keep-Alive Configuration
const KEEP_ALIVE_INTERVAL_MS = 15000; // 15 seconds
const STREAM_TIMEOUT_MS = 60000; // 60 seconds
const ACK_TIMEOUT_MS = 5000; // 5 seconds
const MAX_MISSED_ACKS = 3; // Max consecutive missed ACKs
Keep-Alive Scheduling
private scheduleKeepAlive(streamId: string): void {
const stream = this.activeSessionStreams.get(streamId);
if (!stream) return;
// Cancel existing timer
if (stream.keepAliveTimer) {
clearInterval(stream.keepAliveTimer);
}
// Schedule periodic keep-alive
stream.keepAliveTimer = setInterval(() => {
this.sendKeepAlive(streamId);
}, KEEP_ALIVE_INTERVAL_MS);
}
Keep-Alive Message
private sendKeepAlive(streamId: string): void {
const stream = this.activeSessionStreams.get(streamId);
if (!stream) return;
// Check stream status
if (!['initializing', 'active'].includes(stream.status)) {
this.stopTracking(streamId);
return;
}
// Check timeout
const timeSinceLastActivity = Date.now() - stream.lastKeepAlive.getTime();
if (timeSinceLastActivity > STREAM_TIMEOUT_MS) {
this.updateStatus(streamId, 'timeout');
return;
}
// Generate short ACK ID for BLE
const ackId = `a${Date.now().toString(36).slice(-5)}`;
this.trackKeepAliveAck(streamId, ackId);
// Send minimal keep-alive message
const keepAliveMsg: KeepRtmpStreamAlive = {
type: CloudToGlassesMessageType.KEEP_RTMP_STREAM_ALIVE,
streamId,
ackId
};
this.userSession.websocket.send(JSON.stringify(keepAliveMsg));
}
ACK Tracking
ACK Management
private trackKeepAliveAck(streamId: string, ackId: string): void {
const stream = this.activeSessionStreams.get(streamId);
if (!stream) return;
const timeout = setTimeout(() => {
this.handleMissedKeepAliveAck(streamId, ackId);
}, ACK_TIMEOUT_MS);
stream.pendingAcks.set(ackId, {
sentAt: new Date(),
timeout
});
}
handleKeepAliveAck(ackMessage: KeepAliveAck): void {
const { streamId, ackId } = ackMessage;
const stream = this.activeSessionStreams.get(streamId);
if (!stream) return;
const pendingAck = stream.pendingAcks.get(ackId);
if (pendingAck) {
clearTimeout(pendingAck.timeout);
stream.pendingAcks.delete(ackId);
stream.missedAcks = 0; // Reset on successful ACK
stream.lastKeepAlive = new Date();
}
}
Missed ACK Handling
private handleMissedKeepAliveAck(streamId: string, ackId: string): void {
const stream = this.activeSessionStreams.get(streamId);
if (!stream) return;
if (stream.pendingAcks.has(ackId)) {
stream.pendingAcks.delete(ackId);
stream.missedAcks++;
if (stream.missedAcks >= MAX_MISSED_ACKS) {
this.logger.error({ streamId, missedAcks: stream.missedAcks },
'Too many missed ACKs, timing out stream');
this.updateStatus(streamId, 'timeout');
}
}
}
Status Management
Status Updates from Glasses
handleRtmpStreamStatus(statusMessage: RtmpStreamStatus): void {
const { streamId, status } = statusMessage;
const stream = this.activeSessionStreams.get(streamId);
if (!stream) return;
// Update activity time
stream.lastKeepAlive = new Date();
// Map glasses status to internal status
let mappedStatus: SessionStreamInfo['status'];
switch (status) {
case 'initializing':
case 'connecting':
case 'reconnecting':
mappedStatus = 'initializing';
break;
case 'active':
case 'streaming':
mappedStatus = 'active';
break;
case 'stopping':
mappedStatus = 'stopping';
break;
case 'stopped':
case 'disconnected':
mappedStatus = 'stopped';
break;
case 'error':
case 'timeout':
mappedStatus = 'timeout';
break;
default:
return; // Ignore unknown statuses
}
this.updateStatus(streamId, mappedStatus);
}
Status Broadcasting
private async sendStreamStatusToApp(
streamId: string,
status: RtmpStreamStatus['status'],
errorDetails?: string,
stats?: RtmpStreamStatus['stats']
): Promise<void> {
const streamInfo = this.activeSessionStreams.get(streamId);
const packageName = streamInfo ? streamInfo.packageName : "unknown_package_owner";
// Direct message to owning app
const appOwnerMessage = {
type: CloudToAppMessageType.RTMP_STREAM_STATUS,
sessionId: `${this.userSession.sessionId}-${packageName}`,
streamId,
status,
errorDetails,
stats,
appId: packageName,
timestamp: new Date()
};
// Send using AppManager for resurrection support
const result = await this.userSession.appManager.sendMessageToApp(
packageName,
appOwnerMessage
);
// Broadcast to other subscribed apps
const broadcastPayload: RtmpStreamStatus = {
type: GlassesToCloudMessageType.RTMP_STREAM_STATUS,
sessionId: this.userSession.sessionId,
streamId,
status,
errorDetails,
appId: packageName,
stats,
timestamp: new Date()
};
sessionService.relayMessageToApps(this.userSession, broadcastPayload);
}
Stream Termination
Stop Stream Request
async stopRtmpStream(request: RtmpStreamStopRequest): Promise<void> {
const { packageName, streamId } = request;
if (streamId) {
// Stop specific stream
const stream = this.activeSessionStreams.get(streamId);
if (stream && stream.packageName === packageName) {
this.updateStatus(streamId, 'stopped');
} else if (stream) {
throw new Error(`App ${packageName} cannot stop stream owned by ${stream.packageName}`);
}
} else {
// Stop all streams for this package
this.stopStreamsByPackageName(packageName);
}
// Send stop command to glasses
if (this.userSession.websocket?.readyState === WebSocket.OPEN) {
const stopMessage: StopRtmpStream = {
type: CloudToGlassesMessageType.STOP_RTMP_STREAM,
sessionId: this.userSession.sessionId,
appId: packageName,
streamId: streamId || '',
timestamp: new Date()
};
this.userSession.websocket.send(JSON.stringify(stopMessage));
}
}
Resource Cleanup
stopTracking(streamId: string): void {
const stream = this.activeSessionStreams.get(streamId);
if (!stream) return;
// Cancel keep-alive timer
if (stream.keepAliveTimer) {
clearInterval(stream.keepAliveTimer);
}
// Cancel all pending ACK timeouts
for (const [, ackInfo] of stream.pendingAcks) {
clearTimeout(ackInfo.timeout);
}
stream.pendingAcks.clear();
this.activeSessionStreams.delete(streamId);
}
Configuration Options
Video Configuration
interface VideoConfig {
width?: number;
height?: number;
bitrate?: number;
fps?: number;
codec?: string;
}
Audio Configuration
interface AudioConfig {
sampleRate?: number;
channels?: number;
bitrate?: number;
codec?: string;
}
Stream Configuration
interface StreamConfig {
reconnectAttempts?: number;
reconnectDelay?: number;
bufferSize?: number;
}
Lifecycle Management
Disposal
dispose(): void {
this.logger.info('Disposing VideoManager');
// Stop all active streams
const streamIdsToStop = Array.from(this.activeSessionStreams.keys());
streamIdsToStop.forEach(streamId => {
this.stopTracking(streamId);
});
this.activeSessionStreams.clear();
}
Best Practices
- Use short IDs for BLE efficiency (streamId, ackId)
- Validate RTMP URLs before attempting connection
- Check for conflicts with managed streaming sessions
- Stop existing streams when starting new ones for same app
- Monitor ACKs closely to detect connection issues early
- Clean up resources properly on stream termination
- Broadcast status changes to keep apps informed
Integration Points
- AppManager: Validates app state and sends messages
- ManagedStreamingExtension: Checks for streaming conflicts
- WebSocket: Communicates with smart glasses
- SessionService: Broadcasts status to subscribed apps