Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/moqtail/moqtail/llms.txt

Use this file to discover all available pages before exploring further.

Content Source

Content sources define where track data comes from - either live streaming, historical caches, or both. The TrackSource interface and its implementations provide the foundation for track publishing in MOQtail.

Overview

A TrackSource can provide:
  • Live objects via LiveObjectSource - Real-time streaming from a ReadableStream
  • Past objects via PastObjectSource - Historical retrieval from an ObjectCache
  • Both - Hybrid live streaming with catch-up support

TrackSource Interface

interface TrackSource {
  readonly past?: PastObjectSource;
  readonly live?: LiveObjectSource;
}
past
PastObjectSource
Optional historical object access via cache-backed range queries
live
LiveObjectSource
Optional live object feed via streaming
At least one of past or live must be provided. Tracks can support:
  • VOD/static: Only past
  • Pure live: Only live
  • Hybrid: Both past and live

LiveObjectSource

Provides push-oriented live object feed from a ReadableStream.

Interface

interface LiveObjectSource {
  readonly stream: ReadableStream<MoqtObject>;
  readonly largestLocation: Location | undefined;
  onNewObject(listener: (obj: MoqtObject) => void): () => void;
  onDone(listener: () => void): () => void;
  stop(): void;
}

Properties

stream
ReadableStream<MoqtObject>
required
Continuous stream yielding objects as they are produced
largestLocation
Location | undefined
Highest (latest) location observed so far; undefined until first object arrives

Methods

onNewObject()

Registers a listener invoked for each new object.
onNewObject(listener: (obj: MoqtObject) => void): () => void
listener
(obj: MoqtObject) => void
required
Function called asynchronously for each new object
return
() => void
Unsubscribe function to remove the listener

onDone()

Registers a listener invoked when the live stream ends.
onDone(listener: () => void): () => void
listener
() => void
required
Function called when stream ends (normal or error)
return
() => void
Unsubscribe function to remove the listener

stop()

Stops ingestion and releases underlying reader. Idempotent.
stop(): void

PastObjectSource

Provides historical object access via cache-backed range queries.

Interface

interface PastObjectSource {
  readonly cache: ObjectCache;
  getRange(start?: Location, end?: Location): Promise<MoqtObject[]>;
}

Properties

cache
ObjectCache
required
Underlying cache from which objects are served. See ObjectCache.

Methods

getRange()

Fetches a closed range of objects.
getRange(start?: Location, end?: Location): Promise<MoqtObject[]>
start
Location
Inclusive start location. If omitted, returns from earliest cached object.
end
Location
Inclusive end location. If omitted, returns up to latest cached object.
return
Promise<MoqtObject[]>
Objects in ascending location order. May be empty if requested window is outside cached range.
Implementations MUST return objects in ascending location order.

Implementations

LiveTrackSource

Concrete implementation of LiveObjectSource.

Constructor

const liveSource = new LiveTrackSource(stream: ReadableStream<MoqtObject>)
stream
ReadableStream<MoqtObject>
required
Stream of MoqtObjects to publish live

Example

import { LiveTrackSource, MoqtObject } from 'moqtail';

// Create a stream of objects (e.g., from video encoder)
const liveStream = new ReadableStream<MoqtObject>({
  async start(controller) {
    // Produce objects
    for (let i = 0; i < 100; i++) {
      const obj = MoqtObject.newWithPayload(
        fullTrackName,
        new Location(i, 0),
        0, // priority
        ObjectForwardingPreference.Subgroup,
        null,
        null,
        new TextEncoder().encode(`Object ${i}`)
      );
      controller.enqueue(obj);
      await sleep(33); // ~30fps
    }
    controller.close();
  }
});

const liveSource = new LiveTrackSource(liveStream);

// Listen to new objects
const unsubscribe = liveSource.onNewObject((obj) => {
  console.log('New object:', obj.groupId, obj.objectId);
});

// Listen for stream end
liveSource.onDone(() => {
  console.log('Live stream ended');
  unsubscribe();
});

StaticTrackSource

Concrete implementation of PastObjectSource.

Constructor

const staticSource = new StaticTrackSource(cache: ObjectCache)
cache
ObjectCache
required
ObjectCache instance containing historical objects

Example

import { StaticTrackSource, MemoryObjectCache, MoqtObject } from 'moqtail';

// Create and populate cache
const cache = new MemoryObjectCache();
for (let i = 0; i < 100; i++) {
  const obj = MoqtObject.newWithPayload(
    fullTrackName,
    new Location(i, 0),
    0,
    ObjectForwardingPreference.Subgroup,
    null,
    null,
    new TextEncoder().encode(`Cached object ${i}`)
  );
  cache.add(obj);
}

const staticSource = new StaticTrackSource(cache);

// Fetch range
const objects = await staticSource.getRange(
  new Location(10, 0),
  new Location(20, 0)
);
console.log(`Fetched ${objects.length} objects`);

HybridTrackSource

Combines both live and past sources.

Constructor

const hybridSource = new HybridTrackSource(
  cache: ObjectCache,
  stream: ReadableStream<MoqtObject>
)
cache
ObjectCache
required
Cache for historical objects
stream
ReadableStream<MoqtObject>
required
Live stream of new objects

Example

import { HybridTrackSource, MemoryObjectCache } from 'moqtail';

const cache = new MemoryObjectCache();
const liveStream = createLiveStream();

// Create hybrid source
const hybridSource = new HybridTrackSource(cache, liveStream);

// Cache live objects as they arrive
hybridSource.live.onNewObject((obj) => {
  cache.add(obj);
});

// Use in track
client.addOrUpdateTrack({
  fullTrackName,
  forwardingPreference: ObjectForwardingPreference.Subgroup,
  trackSource: hybridSource,
  publisherPriority: 0
});

Usage Patterns

Live-Only Publishing

import { LiveTrackSource } from 'moqtail';

const liveStream = createVideoStream(); // Your stream source
const trackSource: TrackSource = {
  live: new LiveTrackSource(liveStream)
};

client.addOrUpdateTrack({
  fullTrackName,
  forwardingPreference: ObjectForwardingPreference.Subgroup,
  trackSource,
  publisherPriority: 0
});

VOD/Static Content

import { StaticTrackSource, MemoryObjectCache } from 'moqtail';

const cache = new MemoryObjectCache();
// Populate cache with pre-recorded content
recording.forEach(obj => cache.add(obj));

const trackSource: TrackSource = {
  past: new StaticTrackSource(cache)
};

client.addOrUpdateTrack({
  fullTrackName,
  forwardingPreference: ObjectForwardingPreference.Subgroup,
  trackSource,
  publisherPriority: 64
});

Hybrid Live + Catch-Up

import { HybridTrackSource, RingBufferObjectCache } from 'moqtail';

// Ring buffer keeps last N objects
const cache = new RingBufferObjectCache(500);
const liveStream = createLiveStream();

const hybrid = new HybridTrackSource(cache, liveStream);

// Auto-cache live objects
hybrid.live.onNewObject(obj => cache.add(obj));

const trackSource: TrackSource = hybrid;

client.addOrUpdateTrack({
  fullTrackName,
  forwardingPreference: ObjectForwardingPreference.Subgroup,
  trackSource,
  publisherPriority: 8
});

Manual TrackSource Object

Create a custom source:
const customSource: TrackSource = {
  live: {
    stream: liveStream,
    largestLocation: undefined,
    onNewObject: (listener) => { /* ... */ return () => {} },
    onDone: (listener) => { /* ... */ return () => {} },
    stop: () => { /* ... */ }
  },
  past: {
    cache: myCache,
    getRange: async (start, end) => {
      return myCache.getRange(start, end);
    }
  }
};

Stream Creation Patterns

From Generator

async function* generateObjects() {
  for (let i = 0; i < 1000; i++) {
    yield MoqtObject.newWithPayload(
      fullTrackName,
      new Location(i, 0),
      0,
      ObjectForwardingPreference.Subgroup,
      null,
      null,
      new TextEncoder().encode(`Frame ${i}`)
    );
    await sleep(33); // 30fps
  }
}

const stream = ReadableStream.from(generateObjects());
const liveSource = new LiveTrackSource(stream);

From Event Source

const stream = new ReadableStream<MoqtObject>({
  start(controller) {
    encoder.addEventListener('frame', (event) => {
      const obj = encodeToMoqtObject(event.frame);
      controller.enqueue(obj);
    });
    
    encoder.addEventListener('end', () => {
      controller.close();
    });
  },
  cancel() {
    encoder.stop();
  }
});

const liveSource = new LiveTrackSource(stream);

From Transform

const rawStream = getRawDataStream();

const objectStream = rawStream.pipeThrough(
  new TransformStream<RawData, MoqtObject>({
    transform(chunk, controller) {
      const obj = convertToMoqtObject(chunk);
      controller.enqueue(obj);
    }
  })
);

const liveSource = new LiveTrackSource(objectStream);

Best Practices

Cache Live Objects: For hybrid sources, cache live objects as they’re produced:
hybridSource.live.onNewObject(obj => cache.add(obj));
Memory Management: Use RingBufferObjectCache for live streams to prevent unbounded memory growth:
const cache = new RingBufferObjectCache(1000); // Keep last 1000 objects
Stream Cleanup: Always call stop() on LiveObjectSource when done to release resources:
liveSource.stop();

See Also