Overview

The Subscription Service is responsible for managing which apps receive which data streams from user sessions. It handles subscription lifecycle, validates permissions, tracks history, and provides efficient queries for message routing. File: packages/cloud/src/services/session/subscription.service.ts

Key Features

  1. In-memory subscription storage for fast access
  2. Permission validation for each subscription
  3. Language-specific subscriptions (e.g., transcription:en-US)
  4. Wildcard subscriptions (* or all)
  5. Subscription history tracking for debugging
  6. Calendar event and location caching
  7. Rate limiting for location updates

Data Structures

Subscription Storage

// Active subscriptions keyed by "sessionId:packageName"
private subscriptions = new Map<string, Set<ExtendedStreamType>>();

// History tracking for debugging
private history = new Map<string, SubscriptionHistory[]>();

// Version tracking for concurrent updates
private subscriptionUpdateVersion = new Map<string, number>();

Subscription History

interface SubscriptionHistory {
  timestamp: Date;
  subscriptions: ExtendedStreamType[];
  action: "add" | "remove" | "update";
}

Core Operations

Update Subscriptions

Updates app subscriptions with permission validation and history tracking:
async updateSubscriptions(
  userSession: UserSession,
  packageName: string,
  subscriptions: SubscriptionRequest[]
): Promise<UserI | null> {
  const key = this.getKey(userSession.userId, packageName);
  
  // Version tracking for concurrent updates
  const currentVersion = (this.subscriptionUpdateVersion.get(key) || 0) + 1;
  this.subscriptionUpdateVersion.set(key, currentVersion);
  
  // Create new subscription set
  const newSubscriptions = new Set<ExtendedStreamType>();
  
  // Process each subscription request
  for (const subRequest of subscriptions) {
    const streamType = subRequest.type;
    
    // Validate permission for stream type
    const hasPermission = await SimplePermissionChecker.checkPermission(
      packageName, 
      streamType
    );
    
    if (!hasPermission) {
      throw new Error(`Missing permission for stream type: ${streamType}`);
    }
    
    // Add base and language-specific subscriptions
    newSubscriptions.add(streamType);
    
    if (subRequest.config?.languages) {
      for (const lang of subRequest.config.languages) {
        const langStream = createTranscriptionStream(streamType, lang);
        newSubscriptions.add(langStream);
      }
    }
  }
  
  // Store subscriptions
  this.subscriptions.set(key, newSubscriptions);
  
  // Track history
  this.addToHistory(key, Array.from(newSubscriptions), "update");
  
  // Persist location rate to database if needed
  // ... database operations with retry logic
}

Query Subscriptions

Get Subscribed Apps

Find all apps subscribed to a specific stream:
getSubscribedApps(
  userSession: UserSession,
  subscription: ExtendedStreamType
): string[] {
  const sessionId = userSession.sessionId;
  const subscribedApps: string[] = [];
  
  for (const [key, subs] of this.subscriptions.entries()) {
    if (!key.startsWith(`${sessionId}:`)) continue;
    
    const [, packageName] = key.split(":");
    
    for (const sub of subs) {
      // Check exact match or wildcards
      if (sub === subscription || 
          sub === StreamType.ALL || 
          sub === StreamType.WILDCARD) {
        subscribedApps.push(packageName);
        break;
      }
      
      // Check language-specific matches
      if (isLanguageStream(sub) && isLanguageStream(subscription)) {
        const subInfo = parseLanguageStream(sub);
        const reqInfo = parseLanguageStream(subscription);
        
        if (subInfo?.type === reqInfo?.type && 
            subInfo?.language === reqInfo?.language) {
          subscribedApps.push(packageName);
          break;
        }
      }
    }
  }
  
  return subscribedApps;
}

Check Media Subscriptions

Determine what types of media processing are needed:
hasPCMTranscriptionSubscriptions(sessionId: string): {
  hasMedia: boolean;
  hasPCM: boolean;
  hasTranscription: boolean;
} {
  let hasMedia = false;
  let hasPCM = false;
  let hasTranscription = false;
  
  for (const [key, subs] of this.subscriptions.entries()) {
    if (!key.startsWith(sessionId + ":")) continue;
    
    for (const sub of subs) {
      if (sub === StreamType.AUDIO_CHUNK) {
        hasPCM = true;
        hasMedia = true;
      } else if (sub === StreamType.TRANSLATION || 
                 sub === StreamType.TRANSCRIPTION) {
        hasTranscription = true;
        hasMedia = true;
      } else {
        // Check language-specific transcription/translation
        const langInfo = parseLanguageStream(sub as string);
        if (langInfo && 
            (langInfo.type === StreamType.TRANSLATION || 
             langInfo.type === StreamType.TRANSCRIPTION)) {
          hasTranscription = true;
          hasMedia = true;
        }
      }
    }
  }
  
  return { hasMedia, hasPCM, hasTranscription };
}

Language Subscriptions

Get minimal set of languages needed for transcription:
getMinimalLanguageSubscriptions(sessionId: string): string[] {
  const languages = new Set<string>();
  
  for (const [key, subs] of this.subscriptions.entries()) {
    if (!key.startsWith(sessionId + ":")) continue;
    
    for (const sub of subs) {
      const langInfo = parseLanguageStream(sub as string);
      if (langInfo?.language) {
        languages.add(langInfo.language);
      }
    }
  }
  
  return Array.from(languages);
}

Caching Services

Calendar Events

// Cache calendar events per session
cacheCalendarEvent(sessionId: string, event: CalendarEvent): void {
  if (!this.calendarEventsCache.has(sessionId)) {
    this.calendarEventsCache.set(sessionId, []);
  }
  this.calendarEventsCache.get(sessionId)!.push(event);
}

// Retrieve all events
getAllCalendarEvents(sessionId: string): CalendarEvent[] {
  return this.calendarEventsCache.get(sessionId) || [];
}

Location Data

// Cache last known location
cacheLocation(sessionId: string, location: Location): void {
  this.lastLocationCache.set(sessionId, location);
}

// Retrieve last location
getLastLocation(sessionId: string): Location | undefined {
  return this.lastLocationCache.get(sessionId);
}

Session Cleanup

Remove all subscriptions and cached data for a session:
removeAllSubscriptionsForSession(sessionId: string): void {
  const removedKeys: string[] = [];
  
  // Remove subscriptions
  for (const key of this.subscriptions.keys()) {
    if (key.startsWith(`${sessionId}:`)) {
      this.subscriptions.delete(key);
      removedKeys.push(key);
    }
  }
  
  // Clear caches
  this.calendarEventsCache.delete(sessionId);
  this.lastLocationCache.delete(sessionId);
  
  // Clean up history and version tracking
  for (const key of removedKeys) {
    this.history.delete(key);
    this.subscriptionUpdateVersion.delete(key);
  }
}

Permission Integration

The service integrates with the SimplePermissionChecker to validate each subscription:
const hasPermission = await SimplePermissionChecker.checkPermission(
  packageName, 
  streamType
);

if (!hasPermission) {
  throw new Error(`App ${packageName} lacks permission for ${streamType}`);
}

Database Persistence

Location subscription rates are persisted to the User model with retry logic:
// Retry logic for database operations
const maxRetries = 3;
for (let attempt = 0; attempt < maxRetries; attempt++) {
  try {
    const user = await User.findOne({ email: userSession.userId });
    
    // Update location subscription rate
    if (locationRate) {
      user.locationSubscriptions.set(sanitizedPackageName, {
        rate: locationRate
      });
    }
    
    user.markModified("locationSubscriptions");
    await user.save();
    break;
  } catch (error) {
    if (error.name === "VersionError" && attempt < maxRetries - 1) {
      // Exponential backoff
      await new Promise(resolve => 
        setTimeout(resolve, Math.pow(2, attempt) * 100)
      );
    } else {
      throw error;
    }
  }
}

Best Practices

  1. Always validate permissions before accepting subscriptions
  2. Use version tracking for concurrent update detection
  3. Implement retry logic for database operations
  4. Clean up session data to prevent memory leaks
  5. Log subscription changes for debugging
  6. Sanitize package names for MongoDB keys