Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/moqtail/moqtail/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Content sources provide objects to tracks. MOQtail supports three types of content sources:
  • Live sources: Real-time streaming content
  • Static sources: Pre-recorded or cached content
  • Hybrid sources: Combination of live and cached content
The TrackSource interface allows tracks to support live, cached, or both types of content simultaneously.

Content Source Architecture

export interface TrackSource {
  /** Historical object access (optional) */
  readonly past?: PastObjectSource
  /** Live object feed (optional) */
  readonly live?: LiveObjectSource
}
At least one of past or live must be present for a valid track.

Live Content Sources

Live sources provide real-time objects as they are created.

LiveObjectSource Interface

export interface LiveObjectSource {
  /** Continuous stream yielding objects as they are produced */
  readonly stream: ReadableStream<MoqtObject>
  
  /** Highest (latest) location observed so far; undefined until first object */
  readonly largestLocation: Location | undefined
  
  /** Register a listener invoked for each new object */
  onNewObject(listener: (obj: MoqtObject) => void): () => void
  
  /** Register a listener invoked when stream ends */
  onDone(listener: () => void): () => void
  
  /** Stop ingestion and release reader */
  stop(): void
}

Creating Live Sources

import { LiveTrackSource, MoqtObject, Location } from 'moqtail'

// Create ReadableStream of objects
const liveStream = new ReadableStream<MoqtObject>({
  async start(controller) {
    let groupId = 0n
    let objectId = 0n
    
    // Simulate video capture at 30fps
    const interval = setInterval(() => {
      try {
        const frame = await captureVideoFrame()
        const obj = MoqtObject.newWithPayload(
          fullTrackName,
          new Location(groupId, objectId++),
          128,
          ObjectForwardingPreference.Subgroup,
          0n,
          null,
          frame
        )
        
        controller.enqueue(obj)
        
        // New GOP every 30 frames
        if (objectId === 30n) {
          groupId++
          objectId = 0n
        }
      } catch (error) {
        controller.error(error)
      }
    }, 33)  // ~30fps
    
    // Cleanup on cancel
    return () => clearInterval(interval)
  }
})

// Wrap in LiveTrackSource
const liveSource = new LiveTrackSource(liveStream)

// Use in track
const track: Track = {
  fullTrackName,
  forwardingPreference: ObjectForwardingPreference.Subgroup,
  trackSource: { live: liveSource },
  publisherPriority: 0
}

Live Source Implementation

The LiveTrackSource class handles stream ingestion:
export class LiveTrackSource implements LiveObjectSource {
  readonly stream: ReadableStream<MoqtObject>
  readonly #listeners = new Set<(obj: MoqtObject) => void>()
  readonly #doneListeners = new Set<() => void>()
  #largestLocation: Location | undefined
  #reader?: ReadableStreamDefaultReader<MoqtObject>

  constructor(stream: ReadableStream<MoqtObject>) {
    this.stream = stream
    this.#startIngest()
  }

  get largestLocation(): Location | undefined {
    return this.#largestLocation
  }

  onNewObject(listener: (obj: MoqtObject) => void): () => void {
    this.#listeners.add(listener)
    return () => this.#listeners.delete(listener)
  }

  onDone(listener: () => void): () => void {
    this.#doneListeners.add(listener)
    return () => this.#doneListeners.delete(listener)
  }

  stop(): void {
    this.#ingestActive = false
    this.#reader?.cancel()
  }
}
  1. Stream reader consumes objects from the ReadableStream
  2. Location tracking maintains the largest location seen
  3. Listener notification invokes registered callbacks for each object
  4. Done handling notifies listeners when stream ends
  5. Async delivery ensures listeners don’t block ingestion

Live Source Patterns

Media Capture

function createCameraStream(fullTrackName: FullTrackName): LiveTrackSource {
  const stream = new ReadableStream<MoqtObject>({
    async start(controller) {
      const mediaStream = await navigator.mediaDevices.getUserMedia({
        video: { width: 1280, height: 720 }
      })
      
      const track = mediaStream.getVideoTracks()[0]
      const processor = new MediaStreamTrackProcessor({ track })
      const reader = processor.readable.getReader()
      
      let groupId = 0n
      let objectId = 0n
      
      while (true) {
        const { done, value: frame } = await reader.read()
        if (done) break
        
        const encoded = await encodeFrame(frame)
        const obj = MoqtObject.newWithPayload(
          fullTrackName,
          new Location(groupId, objectId++),
          128,
          ObjectForwardingPreference.Subgroup,
          0n,
          null,
          encoded
        )
        
        controller.enqueue(obj)
        
        if (isKeyFrame(frame)) {
          groupId++
          objectId = 0n
        }
      }
    }
  })
  
  return new LiveTrackSource(stream)
}

Sensor Data

function createSensorStream(fullTrackName: FullTrackName): LiveTrackSource {
  const stream = new ReadableStream<MoqtObject>({
    start(controller) {
      let groupId = 0n
      let objectId = 0n
      
      // Simulate sensor readings
      setInterval(() => {
        const reading = {
          temperature: Math.random() * 30 + 10,
          humidity: Math.random() * 100,
          timestamp: Date.now()
        }
        
        const payload = new TextEncoder().encode(JSON.stringify(reading))
        const obj = MoqtObject.newWithPayload(
          fullTrackName,
          new Location(groupId, objectId++),
          128,
          ObjectForwardingPreference.Datagram,  // Low latency
          null,
          null,
          payload
        )
        
        controller.enqueue(obj)
        
        // New group every 100 readings
        if (objectId === 100n) {
          groupId++
          objectId = 0n
        }
      }, 100)  // 10Hz
    }
  })
  
  return new LiveTrackSource(stream)
}

Static Content Sources

Static sources provide cached or pre-recorded content.

PastObjectSource Interface

export interface PastObjectSource {
  /** Underlying cache from which objects are served */
  readonly cache: ObjectCache
  
  /** 
   * Fetch a range of objects. Start/end are inclusive.
   * Omitted bounds mean from earliest or to latest cached.
   */
  getRange(start?: Location, end?: Location): Promise<MoqtObject[]>
}

Object Caches

Two cache implementations are provided:

MemoryObjectCache

Unlimited in-memory storage with binary search:
export class MemoryObjectCache implements ObjectCache {
  private objects: MoqtObject[] = []

  add(obj: MoqtObject): void {
    const idx = this._findInsertIndex(obj)
    this.objects.splice(idx, 0, obj)
  }

  getRange(start?: Location, end?: Location): MoqtObject[] {
    const startIdx = start ? this._findIndex(start, false) : 0
    const endIdx = end ? this._findIndex(end, true) : this.objects.length
    return this.objects.slice(startIdx, endIdx)
  }

  getByLocation(location: Location): MoqtObject | undefined {
    const idx = this._findIndex(location, false)
    const obj = this.objects[idx]
    if (obj && obj.groupId === location.group && obj.objectId === location.object) {
      return obj
    }
    return undefined
  }

  size(): number {
    return this.objects.length
  }

  clear(): void {
    this.objects = []
  }
}
MemoryObjectCache maintains objects in sorted order by location, enabling efficient range queries via binary search.

RingBufferObjectCache

Fixed-size cache with automatic eviction:
export class RingBufferObjectCache implements ObjectCache {
  private buffer: MoqtObject[]
  private maxSize: number

  constructor(maxSize: number = 100) {
    this.buffer = []
    this.maxSize = maxSize
  }

  add(obj: MoqtObject): void {
    const idx = this._findInsertIndex(obj)
    this.buffer.splice(idx, 0, obj)
    
    // Evict oldest if over capacity
    if (this.buffer.length > this.maxSize) {
      this.buffer.shift()
    }
  }

  // ... similar methods to MemoryObjectCache
}

Creating Static Sources

import { StaticTrackSource, MemoryObjectCache } from 'moqtail'

// Create cache
const cache = new MemoryObjectCache()

// Populate with pre-recorded content
for (let g = 0n; g < 100n; g++) {
  for (let o = 0n; o < 30n; o++) {
    const obj = MoqtObject.newWithPayload(
      fullTrackName,
      new Location(g, o),
      128,
      ObjectForwardingPreference.Subgroup,
      0n,
      null,
      loadFrameData(g, o)
    )
    cache.add(obj)
  }
}

// Create static source
const staticSource = new StaticTrackSource(cache)

// Use in track
const vodTrack: Track = {
  fullTrackName: FullTrackName.tryNew('vod/content', 'movie'),
  forwardingPreference: ObjectForwardingPreference.Subgroup,
  trackSource: { past: staticSource },
  publisherPriority: 64
}

Static Source Implementation

export class StaticTrackSource implements PastObjectSource {
  readonly cache: ObjectCache

  constructor(cache: ObjectCache) {
    this.cache = cache
  }

  async getRange(start?: Location, end?: Location): Promise<MoqtObject[]> {
    return this.cache.getRange(start, end)
  }
}

Static Source Patterns

VOD Content

async function loadVODTrack(filePath: string): Promise<Track> {
  const cache = new MemoryObjectCache()
  const file = await loadMediaFile(filePath)
  
  // Parse and cache segments
  const segments = parseMediaSegments(file)
  for (const [groupId, segment] of segments.entries()) {
    const frames = extractFrames(segment)
    for (const [objectId, frame] of frames.entries()) {
      const obj = MoqtObject.newWithPayload(
        fullTrackName,
        new Location(BigInt(groupId), BigInt(objectId)),
        128,
        ObjectForwardingPreference.Subgroup,
        0n,
        null,
        frame.data
      )
      cache.add(obj)
    }
  }
  
  return {
    fullTrackName: FullTrackName.tryNew('vod/movies', path.basename(filePath)),
    forwardingPreference: ObjectForwardingPreference.Subgroup,
    trackSource: { past: new StaticTrackSource(cache) },
    publisherPriority: 64
  }
}

File Transfer

function createFileTransferTrack(file: File): Track {
  const cache = new MemoryObjectCache()
  const chunkSize = 64 * 1024  // 64KB chunks
  
  let groupId = 0n
  let objectId = 0n
  
  for (let offset = 0; offset < file.size; offset += chunkSize) {
    const chunk = file.slice(offset, offset + chunkSize)
    const obj = MoqtObject.newWithPayload(
      FullTrackName.tryNew('files/transfer', file.name),
      new Location(groupId, objectId++),
      128,
      ObjectForwardingPreference.Subgroup,
      0n,
      null,
      new Uint8Array(await chunk.arrayBuffer())
    )
    cache.add(obj)
    
    // New group every 10MB
    if (offset % (10 * 1024 * 1024) === 0) {
      groupId++
      objectId = 0n
    }
  }
  
  return {
    fullTrackName: FullTrackName.tryNew('files/transfer', file.name),
    forwardingPreference: ObjectForwardingPreference.Subgroup,
    trackSource: { past: new StaticTrackSource(cache) },
    publisherPriority: 128
  }
}

Hybrid Content Sources

Hybrid sources combine live and cached content for catch-up scenarios.

HybridTrackSource Implementation

export class HybridTrackSource implements TrackSource {
  readonly past: PastObjectSource
  readonly live: LiveObjectSource

  constructor(cache: ObjectCache, stream: ReadableStream<MoqtObject>) {
    this.past = new StaticTrackSource(cache)
    this.live = new LiveTrackSource(stream)
  }
}

Creating Hybrid Sources

import { HybridTrackSource, RingBufferObjectCache } from 'moqtail'

// Create ring buffer for recent history
const cache = new RingBufferObjectCache(300)  // Last 10 seconds @ 30fps

// Create live stream that also populates cache
const liveStream = new ReadableStream<MoqtObject>({
  start(controller) {
    let groupId = 0n
    let objectId = 0n
    
    setInterval(() => {
      const frame = captureFrame()
      const obj = MoqtObject.newWithPayload(
        fullTrackName,
        new Location(groupId, objectId++),
        128,
        ObjectForwardingPreference.Subgroup,
        0n,
        null,
        frame
      )
      
      // Add to cache AND stream
      cache.add(obj)
      controller.enqueue(obj)
      
      if (objectId === 30n) {
        groupId++
        objectId = 0n
      }
    }, 33)
  }
})

// Create hybrid source
const hybridSource = new HybridTrackSource(cache, liveStream)

// Use in track
const track: Track = {
  fullTrackName: FullTrackName.tryNew('live/stream', 'video'),
  forwardingPreference: ObjectForwardingPreference.Subgroup,
  trackSource: {
    past: hybridSource.past,
    live: hybridSource.live
  },
  publisherPriority: 0
}

Hybrid Source Patterns

Catch-up TV

function createCatchUpStream(): HybridTrackSource {
  // Cache last 2 hours
  const cache = new RingBufferObjectCache(216000)  // 2hrs @ 30fps
  
  const liveStream = new ReadableStream<MoqtObject>({
    async start(controller) {
      const mediaStream = await getLiveBroadcast()
      // ... encode and stream frames
      // Add each frame to cache as it's streamed
    }
  })
  
  return new HybridTrackSource(cache, liveStream)
}

// Subscribers can:
// 1. Fetch recent history: await client.fetch(...)
// 2. Then subscribe to live: await client.subscribe(...)

DVR Functionality

function createDVRTrack(): Track {
  const cache = new MemoryObjectCache()  // Unlimited history
  
  const liveStream = new ReadableStream<MoqtObject>({
    start(controller) {
      // Stream live content while building cache
      setInterval(() => {
        const obj = captureLiveFrame()
        cache.add(obj)
        controller.enqueue(obj)
      }, 33)
    }
  })
  
  return {
    fullTrackName: FullTrackName.tryNew('live/dvr', 'stream'),
    forwardingPreference: ObjectForwardingPreference.Subgroup,
    trackSource: new HybridTrackSource(cache, liveStream),
    publisherPriority: 0
  }
}
Small cache (RingBufferObjectCache with 100-500 objects):
  • Short catch-up window (few seconds)
  • Low memory footprint
  • Good for: Real-time communication, live sports
Medium cache (RingBufferObjectCache with 5000-50000 objects):
  • Medium catch-up window (minutes to hours)
  • Reasonable memory usage
  • Good for: Live events, news, broadcasts
Large cache (MemoryObjectCache, unlimited):
  • Full history available
  • High memory usage
  • Good for: DVR, replay, complete archives

Content Source Lifecycle

Publisher Side

// 1. Create content source
const liveSource = new LiveTrackSource(liveStream)

// 2. Register listeners (optional)
const unsubscribe = liveSource.onNewObject((obj) => {
  console.log('New object:', obj.location)
})

// 3. Create track with source
const track: Track = {
  fullTrackName,
  forwardingPreference: ObjectForwardingPreference.Subgroup,
  trackSource: { live: liveSource },
  publisherPriority: 0
}

// 4. Add to client
client.addOrUpdateTrack(track)

// 5. Cleanup when done
unsubscribe()
liveSource.stop()
client.removeTrack(track)

Subscriber Side

Subscribers don’t interact with content sources directly. They receive objects through subscriptions and fetches managed by the MOQtailClient.

Learn More

Explore the complete Track API reference