async startManagedStream(
userSession: UserSession,
request: ManagedStreamRequest
): Promise<string> {
const { packageName, quality, enableWebRTC, video, audio } = request;
// Validation
if (!userSession.appManager.isAppRunning(packageName)) {
throw new Error(`App ${packageName} is not running`);
}
// Check conflicts
const conflict = this.stateManager.checkStreamConflict(userId, "managed");
if (conflict.hasConflict) {
throw new Error(conflict.message);
}
// Check for existing stream
const existingStream = this.stateManager.getStreamState(userId);
if (existingStream && existingStream.type === "managed") {
// Add as viewer to existing stream
const managedStream = this.stateManager.createOrJoinManagedStream({
userId,
appId: packageName,
liveInput: {
liveInputId: existingStream.cfLiveInputId,
rtmpUrl: existingStream.cfIngestUrl,
hlsUrl: existingStream.hlsUrl,
dashUrl: existingStream.dashUrl,
webrtcUrl: existingStream.webrtcUrl
}
});
return managedStream.streamId;
}
// Create new Cloudflare live input
const liveInput = await this.cloudflareService.createLiveInput(userId, {
quality,
enableWebRTC,
enableRecording: true, // Required for playback
requireSignedURLs: false, // Public streams
restreamDestinations
});
// Create managed stream state
const managedStream = this.stateManager.createOrJoinManagedStream({
userId,
appId: packageName,
liveInput
});
// Start keep-alive
this.startKeepAlive(userId, managedStream.streamId, managedStream.cfLiveInputId);
// Send start to glasses with Cloudflare URL
const startMessage: StartRtmpStream = {
type: CloudToGlassesMessageType.START_RTMP_STREAM,
sessionId: userSession.sessionId,
rtmpUrl: liveInput.rtmpUrl, // Cloudflare ingest URL
appId: "MANAGED_STREAM", // Special identifier
streamId: managedStream.streamId,
video: video || {},
audio: audio || {},
timestamp: new Date()
};
userSession.websocket.send(JSON.stringify(startMessage));
// Start polling for playback URLs
this.startPlaybackUrlPolling(userId, packageName, managedStream);
return managedStream.streamId;
}