Documentation Index
Fetch the complete documentation index at: https://cloud-docs.mentra.glass/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The Subscription Service is responsible for managing which apps receive which data streams from user sessions. It handles subscription lifecycle, validates permissions, tracks history, and provides efficient queries for message routing.
File: packages/cloud/src/services/session/subscription.service.ts
Key Features
- In-memory subscription storage for fast access
- Permission validation for each subscription
- Language-specific subscriptions (e.g.,
transcription:en-US)
- Wildcard subscriptions (
* or all)
- Subscription history tracking for debugging
- Calendar event and location caching
- Rate limiting for location updates
Data Structures
Subscription Storage
// Active subscriptions keyed by "sessionId:packageName"
private subscriptions = new Map<string, Set<ExtendedStreamType>>();
// History tracking for debugging
private history = new Map<string, SubscriptionHistory[]>();
// Version tracking for concurrent updates
private subscriptionUpdateVersion = new Map<string, number>();
Subscription History
interface SubscriptionHistory {
timestamp: Date;
subscriptions: ExtendedStreamType[];
action: "add" | "remove" | "update";
}
Core Operations
Update Subscriptions
Updates app subscriptions with permission validation and history tracking:
async updateSubscriptions(
userSession: UserSession,
packageName: string,
subscriptions: SubscriptionRequest[]
): Promise<UserI | null> {
const key = this.getKey(userSession.userId, packageName);
// Version tracking for concurrent updates
const currentVersion = (this.subscriptionUpdateVersion.get(key) || 0) + 1;
this.subscriptionUpdateVersion.set(key, currentVersion);
// Create new subscription set
const newSubscriptions = new Set<ExtendedStreamType>();
// Process each subscription request
for (const subRequest of subscriptions) {
const streamType = subRequest.type;
// Validate permission for stream type
const hasPermission = await SimplePermissionChecker.checkPermission(
packageName,
streamType
);
if (!hasPermission) {
throw new Error(`Missing permission for stream type: ${streamType}`);
}
// Add base and language-specific subscriptions
newSubscriptions.add(streamType);
if (subRequest.config?.languages) {
for (const lang of subRequest.config.languages) {
const langStream = createTranscriptionStream(streamType, lang);
newSubscriptions.add(langStream);
}
}
}
// Store subscriptions
this.subscriptions.set(key, newSubscriptions);
// Track history
this.addToHistory(key, Array.from(newSubscriptions), "update");
// Persist location rate to database if needed
// ... database operations with retry logic
}
Query Subscriptions
Get Subscribed Apps
Find all apps subscribed to a specific stream:
getSubscribedApps(
userSession: UserSession,
subscription: ExtendedStreamType
): string[] {
const sessionId = userSession.sessionId;
const subscribedApps: string[] = [];
for (const [key, subs] of this.subscriptions.entries()) {
if (!key.startsWith(`${sessionId}:`)) continue;
const [, packageName] = key.split(":");
for (const sub of subs) {
// Check exact match or wildcards
if (sub === subscription ||
sub === StreamType.ALL ||
sub === StreamType.WILDCARD) {
subscribedApps.push(packageName);
break;
}
// Check language-specific matches
if (isLanguageStream(sub) && isLanguageStream(subscription)) {
const subInfo = parseLanguageStream(sub);
const reqInfo = parseLanguageStream(subscription);
if (subInfo?.type === reqInfo?.type &&
subInfo?.language === reqInfo?.language) {
subscribedApps.push(packageName);
break;
}
}
}
}
return subscribedApps;
}
Determine what types of media processing are needed:
hasPCMTranscriptionSubscriptions(sessionId: string): {
hasMedia: boolean;
hasPCM: boolean;
hasTranscription: boolean;
} {
let hasMedia = false;
let hasPCM = false;
let hasTranscription = false;
for (const [key, subs] of this.subscriptions.entries()) {
if (!key.startsWith(sessionId + ":")) continue;
for (const sub of subs) {
if (sub === StreamType.AUDIO_CHUNK) {
hasPCM = true;
hasMedia = true;
} else if (sub === StreamType.TRANSLATION ||
sub === StreamType.TRANSCRIPTION) {
hasTranscription = true;
hasMedia = true;
} else {
// Check language-specific transcription/translation
const langInfo = parseLanguageStream(sub as string);
if (langInfo &&
(langInfo.type === StreamType.TRANSLATION ||
langInfo.type === StreamType.TRANSCRIPTION)) {
hasTranscription = true;
hasMedia = true;
}
}
}
}
return { hasMedia, hasPCM, hasTranscription };
}
Language Subscriptions
Get minimal set of languages needed for transcription:
getMinimalLanguageSubscriptions(sessionId: string): string[] {
const languages = new Set<string>();
for (const [key, subs] of this.subscriptions.entries()) {
if (!key.startsWith(sessionId + ":")) continue;
for (const sub of subs) {
const langInfo = parseLanguageStream(sub as string);
if (langInfo?.language) {
languages.add(langInfo.language);
}
}
}
return Array.from(languages);
}
Caching Services
Calendar Events
// Cache calendar events per session
cacheCalendarEvent(sessionId: string, event: CalendarEvent): void {
if (!this.calendarEventsCache.has(sessionId)) {
this.calendarEventsCache.set(sessionId, []);
}
this.calendarEventsCache.get(sessionId)!.push(event);
}
// Retrieve all events
getAllCalendarEvents(sessionId: string): CalendarEvent[] {
return this.calendarEventsCache.get(sessionId) || [];
}
Location Data
// Cache last known location
cacheLocation(sessionId: string, location: Location): void {
this.lastLocationCache.set(sessionId, location);
}
// Retrieve last location
getLastLocation(sessionId: string): Location | undefined {
return this.lastLocationCache.get(sessionId);
}
Session Cleanup
Remove all subscriptions and cached data for a session:
removeAllSubscriptionsForSession(sessionId: string): void {
const removedKeys: string[] = [];
// Remove subscriptions
for (const key of this.subscriptions.keys()) {
if (key.startsWith(`${sessionId}:`)) {
this.subscriptions.delete(key);
removedKeys.push(key);
}
}
// Clear caches
this.calendarEventsCache.delete(sessionId);
this.lastLocationCache.delete(sessionId);
// Clean up history and version tracking
for (const key of removedKeys) {
this.history.delete(key);
this.subscriptionUpdateVersion.delete(key);
}
}
Permission Integration
The service integrates with the SimplePermissionChecker to validate each subscription:
const hasPermission = await SimplePermissionChecker.checkPermission(
packageName,
streamType
);
if (!hasPermission) {
throw new Error(`App ${packageName} lacks permission for ${streamType}`);
}
Database Persistence
Location subscription rates are persisted to the User model with retry logic:
// Retry logic for database operations
const maxRetries = 3;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const user = await User.findOne({ email: userSession.userId });
// Update location subscription rate
if (locationRate) {
user.locationSubscriptions.set(sanitizedPackageName, {
rate: locationRate
});
}
user.markModified("locationSubscriptions");
await user.save();
break;
} catch (error) {
if (error.name === "VersionError" && attempt < maxRetries - 1) {
// Exponential backoff
await new Promise(resolve =>
setTimeout(resolve, Math.pow(2, attempt) * 100)
);
} else {
throw error;
}
}
}
Best Practices
- Always validate permissions before accepting subscriptions
- Use version tracking for concurrent update detection
- Implement retry logic for database operations
- Clean up session data to prevent memory leaks
- Log subscription changes for debugging
- Sanitize package names for MongoDB keys