Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/moqtail/moqtail/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Hybrid tracks in Moqtail combine the best of both worlds: live streaming for real-time content and cached access for historical data. This pattern is essential for applications that need both low-latency live viewing and the ability to seek or replay past content.
Hybrid tracks are ideal for live broadcasts with DVR functionality, live sports with instant replay, or any scenario requiring both live tail and historical access.

HybridTrackSource

The HybridTrackSource class manages both live and cached content:
import {
  HybridTrackSource,
  MemoryObjectCache,
  MoqtObject
} from 'moqtail-ts'

// Create cache for historical objects
const cache = new MemoryObjectCache()

// Create live stream
const liveStream = new ReadableStream<MoqtObject>({
  async start(controller) {
    // Generate live objects
    while (capturing) {
      const object = await generateObject()
      controller.enqueue(object)
      
      // Also add to cache for later access
      cache.add(object)
    }
  }
})

// Create hybrid source
const hybridSource = new HybridTrackSource(cache, liveStream)

Track Source Interface

The hybrid source implements both LiveObjectSource and PastObjectSource:
interface TrackSource {
  readonly past?: PastObjectSource   // Historical access
  readonly live?: LiveObjectSource   // Live streaming
}
1

Past Access

Retrieve historical objects from the cache:
const pastSource = hybridSource.past
const historicalObjects = await pastSource.getRange(
  new Location(0n, 0n),
  new Location(50n, 0n)
)
2

Live Access

Receive new objects as they’re produced:
const liveSource = hybridSource.live
const stream = liveSource.stream

// Or listen for new objects
liveSource.onNewObject((obj) => {
  console.log('New live object:', obj.groupId, obj.objectId)
})

Live Broadcast with DVR Example

Complete implementation of a live broadcast with replay capability:
import {
  MOQtailClient,
  HybridTrackSource,
  RingBufferObjectCache,
  MoqtObject,
  Location,
  FullTrackName,
  ObjectForwardingPreference,
  Track
} from 'moqtail-ts'

// Configuration
const DVR_WINDOW_SECONDS = 3600 // 1 hour of history
const GOP_DURATION_MS = 2000 // 2 second GOPs
const FRAME_RATE = 30
const OBJECTS_PER_GOP = FRAME_RATE * (GOP_DURATION_MS / 1000)
const MAX_CACHED_OBJECTS = Math.ceil(
  (DVR_WINDOW_SECONDS * 1000 / GOP_DURATION_MS) * OBJECTS_PER_GOP
)

class LiveBroadcastPublisher {
  private groupId = 0n
  private objectId = 0n
  private frameCount = 0
  private cache: RingBufferObjectCache
  private fullTrackName: FullTrackName
  
  constructor(
    private channelId: string,
    private streamName: string
  ) {
    // Use ring buffer to maintain fixed DVR window
    this.cache = new RingBufferObjectCache(MAX_CACHED_OBJECTS)
    this.fullTrackName = FullTrackName.tryNew(
      `live/${channelId}`,
      streamName
    )
  }
  
  createLiveStream(): ReadableStream<MoqtObject> {
    return new ReadableStream<MoqtObject>({
      start: async (controller) => {
        // Get video source
        const stream = await navigator.mediaDevices.getUserMedia({
          video: {
            width: 1920,
            height: 1080,
            frameRate: 30
          }
        })
        
        const videoTrack = stream.getVideoTracks()[0]
        const processor = new MediaStreamTrackProcessor({ track: videoTrack })
        const reader = processor.readable.getReader()
        
        // Set up encoder
        const encoder = new VideoEncoder({
          output: (chunk, metadata) => {
            this.handleEncodedChunk(chunk, metadata, controller)
          },
          error: (error) => {
            console.error('Encoding error:', error)
            controller.error(error)
          }
        })
        
        encoder.configure({
          codec: 'vp09.00.10.08',
          width: 1920,
          height: 1080,
          bitrate: 5_000_000,
          framerate: 30
        })
        
        // Encode loop
        try {
          while (true) {
            const { done, value: frame } = await reader.read()
            if (done) break
            
            const isKeyframe = this.frameCount % OBJECTS_PER_GOP === 0
            encoder.encode(frame, { keyFrame: isKeyframe })
            frame.close()
            
            this.frameCount++
          }
        } finally {
          await encoder.flush()
          controller.close()
        }
      }
    })
  }
  
  private handleEncodedChunk(
    chunk: EncodedVideoChunk,
    metadata: EncodedVideoChunkMetadata,
    controller: ReadableStreamDefaultController<MoqtObject>
  ) {
    const isKeyframe = chunk.type === 'key'
    
    // Create MOQT object
    const data = new Uint8Array(chunk.byteLength)
    chunk.copyTo(data)
    
    const object = MoqtObject.newWithPayload(
      this.fullTrackName,
      new Location(this.groupId, this.objectId),
      isKeyframe ? 0 : 8,
      ObjectForwardingPreference.Subgroup,
      null,
      null,
      data
    )
    
    // Add to cache (ring buffer automatically evicts old objects)
    this.cache.add(object)
    
    // Send to live subscribers
    controller.enqueue(object)
    
    // Advance location
    if (isKeyframe && this.objectId > 0n) {
      // New GOP
      this.groupId++
      this.objectId = 0n
    } else {
      this.objectId++
    }
  }
  
  createTrack(): Track {
    const liveStream = this.createLiveStream()
    const hybridSource = new HybridTrackSource(this.cache, liveStream)
    
    return {
      fullTrackName: this.fullTrackName,
      forwardingPreference: ObjectForwardingPreference.Subgroup,
      trackSource: hybridSource,
      publisherPriority: 0
    }
  }
  
  getCacheInfo() {
    return {
      size: this.cache.size(),
      maxSize: MAX_CACHED_OBJECTS,
      utilizationPercent: (this.cache.size() / MAX_CACHED_OBJECTS) * 100
    }
  }
}

// Usage
async function startBroadcast() {
  const publisher = new LiveBroadcastPublisher('conference-room-42', 'video')
  const track = publisher.createTrack()
  
  const client = await MOQtailClient.new({
    url: 'https://relay.example.com',
    supportedVersions: [0xff00000b]
  })
  
  client.addOrUpdateTrack(track)
  await client.publishNamespace(['live', 'conference-room-42'])
  
  // Monitor cache
  setInterval(() => {
    const info = publisher.getCacheInfo()
    console.log(`Cache: ${info.size}/${info.maxSize} (${info.utilizationPercent.toFixed(1)}%)`)
  }, 5000)
  
  return { client, publisher }
}

Subscriber Interaction Patterns

With hybrid tracks, subscribers can use different access patterns:

Pattern 1: Live Tail

Subscriber joins live and receives new objects:
// Subscriber code
const result = await client.subscribe({
  fullTrackName,
  filterType: FilterType.LatestObject,
  forward: true,
  groupOrder: GroupOrder.Original,
  priority: 0
})

if (!(result instanceof SubscribeError)) {
  for await (const obj of result.stream) {
    // Receive live objects
    displayFrame(obj)
  }
}

Pattern 2: Historical Fetch + Live Tail

Subscriber fetches recent history, then subscribes to live:
// 1. Fetch last 30 seconds
const fetchResult = await client.fetch({
  priority: 32,
  groupOrder: GroupOrder.Original,
  typeAndProps: {
    type: FetchType.StandAlone,
    props: {
      fullTrackName,
      startLocation: new Location(currentGroup - 15n, 0n), // ~30s ago
      endLocation: new Location(currentGroup, 0n)
    }
  }
})

if (!(fetchResult instanceof FetchError)) {
  // Display historical objects
  for await (const obj of fetchResult.stream) {
    displayFrame(obj)
  }
}

// 2. Subscribe to live
const subResult = await client.subscribe({
  fullTrackName,
  filterType: FilterType.LatestObject,
  forward: true,
  groupOrder: GroupOrder.Original,
  priority: 0
})

Pattern 3: Seek in History

Subscriber seeks to specific point in cached content:
// Fetch specific range
const fetchResult = await client.fetch({
  priority: 64,
  groupOrder: GroupOrder.Original,
  typeAndProps: {
    type: FetchType.StandAlone,
    props: {
      fullTrackName,
      startLocation: seekLocation,
      endLocation: new Location(seekLocation.group + 10n, 0n)
    }
  }
})

Cache Management Strategies

Fixed Window (Ring Buffer)

Best for consistent memory usage:
import { RingBufferObjectCache } from 'moqtail-ts'

// Keep last 1 hour, ~5400 objects for 30fps 2s GOPs
const cache = new RingBufferObjectCache(5400)

Unlimited Growth (Memory Cache)

Best for complete session recording:
import { MemoryObjectCache } from 'moqtail-ts'

// Grows unbounded - monitor memory usage
const cache = new MemoryObjectCache()

// Periodically check size
if (cache.size() > MAX_ACCEPTABLE_SIZE) {
  console.warn('Cache growing too large')
}

Time-Based Eviction

Implement custom cache with time-based cleanup:
class TimeBasedObjectCache implements ObjectCache {
  private objects: Array<{ obj: MoqtObject; timestamp: number }> = []
  private maxAgeMs: number
  
  constructor(maxAgeMs: number) {
    this.maxAgeMs = maxAgeMs
  }
  
  add(obj: MoqtObject): void {
    this.objects.push({ obj, timestamp: Date.now() })
    this.evictOld()
  }
  
  private evictOld(): void {
    const cutoff = Date.now() - this.maxAgeMs
    this.objects = this.objects.filter(item => item.timestamp >= cutoff)
  }
  
  // Implement other ObjectCache methods...
}

// Use for 30-minute sliding window
const cache = new TimeBasedObjectCache(30 * 60 * 1000)

Synchronizing Cache and Stream

Ensure objects are added to cache before being streamed:
const liveStream = new ReadableStream<MoqtObject>({
  async start(controller) {
    while (capturing) {
      const object = await generateObject()
      
      // IMPORTANT: Add to cache first
      cache.add(object)
      
      // Then stream to live subscribers
      controller.enqueue(object)
    }
  }
})
Always add objects to the cache before enqueueing them to the live stream. This ensures fetch requests can retrieve recently streamed objects.

Multi-Quality Hybrid Tracks

Publish multiple quality levels with shared cache:
class MultiQualityPublisher {
  private caches = new Map<string, RingBufferObjectCache>()
  
  async publishQualities(
    client: MOQtailClient,
    namespace: string[],
    qualities: Array<{ name: string; bitrate: number; resolution: string }>
  ) {
    for (const quality of qualities) {
      const cache = new RingBufferObjectCache(5400)
      this.caches.set(quality.name, cache)
      
      const stream = this.createEncoderStream(quality)
      const hybridSource = new HybridTrackSource(cache, stream)
      
      const track: Track = {
        fullTrackName: FullTrackName.tryNew(
          namespace.join('/'),
          quality.name
        ),
        forwardingPreference: ObjectForwardingPreference.Subgroup,
        trackSource: hybridSource,
        publisherPriority: quality.bitrate < 1_000_000 ? 16 : 0
      }
      
      client.addOrUpdateTrack(track)
    }
    
    await client.publishNamespace(namespace)
  }
  
  private createEncoderStream(quality: any): ReadableStream<MoqtObject> {
    // Create encoder for this quality level
    // Implementation omitted for brevity
    return new ReadableStream<MoqtObject>({/* ... */})
  }
}

// Usage
const publisher = new MultiQualityPublisher()
await publisher.publishQualities(client, ['live', 'event'], [
  { name: 'high', bitrate: 5_000_000, resolution: '1920x1080' },
  { name: 'medium', bitrate: 2_000_000, resolution: '1280x720' },
  { name: 'low', bitrate: 500_000, resolution: '640x360' }
])

Best Practices

Cache Sizing

Calculate cache size based on your DVR window:
const dvrSeconds = 3600 // 1 hour
const gopDurationSeconds = 2
const framesPerGop = 60
const cacheSize = Math.ceil(
  (dvrSeconds / gopDurationSeconds) * framesPerGop
)

Memory Monitoring

// Monitor memory usage
setInterval(() => {
  const used = process.memoryUsage().heapUsed / 1024 / 1024
  console.log(`Memory: ${used.toFixed(2)} MB`)
  console.log(`Cache size: ${cache.size()} objects`)
}, 10000)

Graceful Degradation

// Fall back to live-only on memory pressure
if (memoryUsage > MEMORY_THRESHOLD) {
  console.warn('High memory - switching to live-only')
  cache.clear()
  // Continue streaming without caching
}

Cache Warmup

Pre-populate cache with recent content when starting:
async function warmupCache(
  cache: ObjectCache,
  recentObjects: MoqtObject[]
) {
  console.log(`Warming cache with ${recentObjects.length} objects`)
  for (const obj of recentObjects) {
    cache.add(obj)
  }
}

Next Steps

Namespaces

Learn how to organize and announce tracks

Creating Tracks

Review track creation fundamentals