Documentation Index
Fetch the complete documentation index at: https://cloud-docs.mentra.glass/llms.txt
Use this file to discover all available pages before exploring further.
Overview
ManagedStreamingExtension works alongside VideoManager to provide managed streaming capabilities. It enables multiple apps to view the same stream, handles Cloudflare Live streaming infrastructure, supports restreaming to multiple destinations, and provides automatic URL discovery for HLS/DASH/WebRTC playback.
File: packages/cloud/src/services/streaming/ManagedStreamingExtension.ts
Key Features
- Multi-Viewer Support: Multiple apps can view the same stream
- Cloudflare Integration: Uses Cloudflare Live for stream infrastructure
- Restream Capabilities: Add/remove multiple RTMP output destinations
- Automatic URL Discovery: Polls for and distributes playback URLs
- Conflict Management: Prevents conflicts between managed/unmanaged streams
- Keep-Alive System: Maintains stream health with ACK tracking
- Per-App Output Control: Apps manage their own restream outputs
Architecture
Stream Management
Managed Stream State
interface ManagedStreamState {
streamId: string;
userId: string;
type: 'managed';
cfLiveInputId: string; // Cloudflare Live Input ID
cfIngestUrl: string; // RTMP ingest URL
activeViewers: Set<string>; // App package names
hlsUrl?: string; // HLS playback URL
dashUrl?: string; // DASH playback URL
webrtcUrl?: string; // WebRTC playback URL
outputs?: Array<{ // Restream outputs
cfOutputId: string;
url: string;
name?: string;
addedBy: string; // App that added it
status?: CloudflareOutput;
}>;
createdAt: Date;
lastActivity: Date;
}
Stream Creation
async startManagedStream(
userSession: UserSession,
request: ManagedStreamRequest
): Promise<string> {
const { packageName, quality, enableWebRTC, video, audio } = request;
// Validation
if (!userSession.appManager.isAppRunning(packageName)) {
throw new Error(`App ${packageName} is not running`);
}
// Check conflicts
const conflict = this.stateManager.checkStreamConflict(userId, "managed");
if (conflict.hasConflict) {
throw new Error(conflict.message);
}
// Check for existing stream
const existingStream = this.stateManager.getStreamState(userId);
if (existingStream && existingStream.type === "managed") {
// Add as viewer to existing stream
const managedStream = this.stateManager.createOrJoinManagedStream({
userId,
appId: packageName,
liveInput: {
liveInputId: existingStream.cfLiveInputId,
rtmpUrl: existingStream.cfIngestUrl,
hlsUrl: existingStream.hlsUrl,
dashUrl: existingStream.dashUrl,
webrtcUrl: existingStream.webrtcUrl
}
});
return managedStream.streamId;
}
// Create new Cloudflare live input
const liveInput = await this.cloudflareService.createLiveInput(userId, {
quality,
enableWebRTC,
enableRecording: true, // Required for playback
requireSignedURLs: false, // Public streams
restreamDestinations
});
// Create managed stream state
const managedStream = this.stateManager.createOrJoinManagedStream({
userId,
appId: packageName,
liveInput
});
// Start keep-alive
this.startKeepAlive(userId, managedStream.streamId, managedStream.cfLiveInputId);
// Send start to glasses with Cloudflare URL
const startMessage: StartRtmpStream = {
type: CloudToGlassesMessageType.START_RTMP_STREAM,
sessionId: userSession.sessionId,
rtmpUrl: liveInput.rtmpUrl, // Cloudflare ingest URL
appId: "MANAGED_STREAM", // Special identifier
streamId: managedStream.streamId,
video: video || {},
audio: audio || {},
timestamp: new Date()
};
userSession.websocket.send(JSON.stringify(startMessage));
// Start polling for playback URLs
this.startPlaybackUrlPolling(userId, packageName, managedStream);
return managedStream.streamId;
}
URL Discovery
Polling System
private startPlaybackUrlPolling(
userId: string,
packageName: string,
managedStream: ManagedStreamState
): void {
const pollInterval = setInterval(async () => {
// Check if stream still active
const currentStream = this.stateManager.getStreamState(userId);
if (!currentStream || currentStream.type !== "managed" ||
currentStream.streamId !== managedStream.streamId) {
clearInterval(pollInterval);
return;
}
// Check if URLs already discovered
if (managedStream.hlsUrl && managedStream.dashUrl) {
clearInterval(pollInterval);
return;
}
// Check if stream is live
const isLive = await this.cloudflareService.waitForStreamLive(
managedStream.cfLiveInputId,
1, // One attempt
0 // No delay
);
if (isLive) {
// Send URLs to all viewers
for (const appId of managedStream.activeViewers) {
await this.sendManagedStreamStatus(
userSession,
appId,
managedStream.streamId,
"active",
"Stream is now live",
managedStream.hlsUrl,
managedStream.dashUrl,
managedStream.webrtcUrl
);
}
clearInterval(pollInterval);
}
}, 2000); // Poll every 2 seconds
// Timeout after 60 seconds
setTimeout(() => {
if (this.pollingIntervals.get(userId) === pollInterval) {
clearInterval(pollInterval);
this.pollingIntervals.delete(userId);
}
}, 60000);
}
Restream Management
Add Output
async addRestreamOutput(
streamId: string,
packageName: string,
destination: { url: string; name?: string }
): Promise<{ success: boolean; outputId?: string; error?: string }> {
const stream = this.stateManager.getStreamByStreamId(streamId);
if (!stream || stream.type !== "managed") {
return { success: false, error: "STREAM_NOT_FOUND" };
}
// Check limits
const MAX_OUTPUTS_PER_STREAM = 10;
const MAX_OUTPUTS_PER_APP = 10;
if (stream.outputs.length >= MAX_OUTPUTS_PER_STREAM) {
return {
success: false,
error: "MAX_OUTPUTS_REACHED",
message: `Stream has reached maximum of ${MAX_OUTPUTS_PER_STREAM} outputs`
};
}
const appOutputCount = stream.outputs
.filter(o => o.addedBy === packageName).length;
if (appOutputCount >= MAX_OUTPUTS_PER_APP) {
return {
success: false,
error: "MAX_APP_OUTPUTS_REACHED",
message: `Your app has reached maximum of ${MAX_OUTPUTS_PER_APP} outputs`
};
}
// Check for duplicate
if (stream.outputs.some(o => o.url === destination.url)) {
return {
success: false,
error: "DUPLICATE_URL",
message: "This RTMP URL is already configured"
};
}
// Create via Cloudflare
const cfOutputs = await this.cloudflareService.createOutputs(
stream.cfLiveInputId,
[destination]
);
const cfOutput = cfOutputs[0];
// Add to stream state with ownership
stream.outputs.push({
cfOutputId: cfOutput.uid,
url: destination.url,
name: destination.name,
addedBy: packageName,
status: cfOutput
});
// Notify all viewers
await this.notifyOutputsChanged(stream);
return { success: true, outputId: cfOutput.uid };
}
Remove Output
async removeRestreamOutput(
streamId: string,
outputId: string,
packageName: string
): Promise<{ success: boolean; error?: string }> {
const stream = this.stateManager.getStreamByStreamId(streamId);
const output = stream.outputs.find(o => o.cfOutputId === outputId);
// Check ownership
if (output.addedBy !== packageName) {
return {
success: false,
error: "NOT_AUTHORIZED",
message: "You can only remove outputs that you added"
};
}
// Remove from Cloudflare
await this.cloudflareService.deleteOutput(stream.cfLiveInputId, outputId);
// Remove from state
stream.outputs = stream.outputs.filter(o => o.cfOutputId !== outputId);
// Notify viewers
await this.notifyOutputsChanged(stream);
return { success: true };
}
Keep-Alive System
Keep-Alive Management
interface ManagedStreamKeepAlive {
userId: string;
streamId: string;
cfLiveInputId: string;
keepAliveTimer?: NodeJS.Timeout;
lastKeepAlive: Date;
pendingAcks: Map<string, { sentAt: Date; timeout: NodeJS.Timeout }>;
missedAcks: number;
}
private startKeepAlive(
userId: string,
streamId: string,
cfLiveInputId: string
): void {
const keepAlive: ManagedStreamKeepAlive = {
userId,
streamId,
cfLiveInputId,
lastKeepAlive: new Date(),
pendingAcks: new Map(),
missedAcks: 0
};
// Schedule periodic keep-alive
keepAlive.keepAliveTimer = setInterval(() => {
this.sendKeepAlive(userId);
}, KEEP_ALIVE_INTERVAL_MS); // 15 seconds
this.managedKeepAlive.set(userId, keepAlive);
}
ACK Handling
handleKeepAliveAck(userId: string, ack: KeepAliveAck): void {
const keepAlive = this.managedKeepAlive.get(userId);
if (!keepAlive) return;
const ackInfo = keepAlive.pendingAcks.get(ack.ackId);
if (ackInfo) {
clearTimeout(ackInfo.timeout);
keepAlive.pendingAcks.delete(ack.ackId);
keepAlive.missedAcks = 0; // Reset on successful ACK
keepAlive.lastKeepAlive = new Date();
}
}
Status Management
Status Distribution
private async sendManagedStreamStatus(
userSession: UserSession,
packageName: string,
streamId: string,
status: ManagedStreamStatus["status"],
message?: string,
hlsUrl?: string,
dashUrl?: string,
webrtcUrl?: string
): Promise<void> {
const stream = this.stateManager.getStreamByStreamId(streamId);
// Convert outputs to status format
let outputs: OutputStatus[] | undefined;
if (stream.outputs && stream.outputs.length > 0) {
outputs = stream.outputs.map(output => ({
url: output.url,
name: output.name,
status: output.status?.status?.current?.state === "connected"
? "active"
: output.status?.status?.current?.state === "error"
? "error"
: "stopped",
error: output.status?.status?.current?.lastError
}));
}
const statusMessage: ManagedStreamStatus = {
type: CloudToAppMessageType.MANAGED_STREAM_STATUS,
status,
hlsUrl: hlsUrl !== undefined ? hlsUrl : stream.hlsUrl,
dashUrl: dashUrl !== undefined ? dashUrl : stream.dashUrl,
webrtcUrl: webrtcUrl !== undefined ? webrtcUrl : stream.webrtcUrl,
streamId,
message,
outputs
};
// Check for duplicate status
const statusKey = `${streamId}:${packageName}`;
const lastStatus = this.lastSentStatus.get(statusKey);
if (lastStatus && this.isDuplicateStatus(lastStatus, statusMessage)) {
return; // Skip duplicate
}
// Send to app
const appWs = userSession.appWebsockets.get(packageName);
appWs.send(JSON.stringify(statusMessage));
// Track last sent
this.lastSentStatus.set(statusKey, statusMessage);
}
Conflict Detection
checkUnmanagedStreamConflict(userId: string): boolean {
const conflict = this.stateManager.checkStreamConflict(userId, "unmanaged");
return conflict.hasConflict;
}
Cleanup
Stream Cleanup
private async cleanupManagedStream(
userSession: UserSession,
userId: string,
stream: ManagedStreamState
): Promise<void> {
// Stop keep-alive
this.stopKeepAlive(userId);
// Stop URL polling
const pollInterval = this.pollingIntervals.get(userId);
if (pollInterval) {
clearInterval(pollInterval);
this.pollingIntervals.delete(userId);
}
// Send stop to glasses
if (userSession.websocket?.readyState === WebSocket.OPEN) {
const stopMessage: StopRtmpStream = {
type: CloudToGlassesMessageType.STOP_RTMP_STREAM,
sessionId: userSession.sessionId,
appId: "MANAGED_STREAM",
streamId: stream.streamId,
timestamp: new Date()
};
userSession.websocket.send(JSON.stringify(stopMessage));
}
// Remove from state
this.stateManager.removeStream(userId);
// Clear status tracking
for (const [key] of this.lastSentStatus) {
if (key.startsWith(`${stream.streamId}:`)) {
this.lastSentStatus.delete(key);
}
}
}
Periodic Cleanup
private async performCleanup(): Promise<void> {
// Clean up inactive streams (60 minute timeout)
const removedUsers = this.stateManager.cleanupInactiveStreams(60);
this.logger.info({
removedStreams: removedUsers.length
}, "Performed periodic cleanup");
}
Configuration
Constants
const KEEP_ALIVE_INTERVAL_MS = 15000; // 15 seconds
const ACK_TIMEOUT_MS = 5000; // 5 seconds
const MAX_MISSED_ACKS = 3; // Before cleanup
const MAX_OUTPUTS_PER_STREAM = 10; // Total outputs
const MAX_OUTPUTS_PER_APP = 10; // Per app limit
Lifecycle Management
Disposal
dispose(): void {
// Stop all keep-alive timers
for (const userId of this.managedKeepAlive.keys()) {
this.stopKeepAlive(userId);
}
// Stop all polling intervals
for (const [userId, interval] of this.pollingIntervals) {
clearInterval(interval);
}
// Clear tracking
this.lastSentStatus.clear();
this.logger.info("ManagedStreamingExtension disposed");
}
Best Practices
- Always check stream conflicts before starting
- Validate app ownership for output operations
- Poll efficiently for URL discovery
- Track last sent status to avoid duplicates
- Clean up resources when streams end
- Handle ACK timeouts to detect dead streams
- Enforce limits on outputs per stream/app
Integration Points
- VideoManager: Works alongside for stream management
- CloudflareStreamService: Manages live input infrastructure
- StreamStateManager: Tracks stream state and conflicts
- SessionService: Retrieves user sessions
- WebSocket: Communicates with glasses