Documentation Index
Fetch the complete documentation index at: https://mintlify.com/moqtail/moqtail/llms.txt
Use this file to discover all available pages before exploring further.
Content Source
Content sources define where track data comes from - either live streaming, historical caches, or both. The TrackSource interface and its implementations provide the foundation for track publishing in MOQtail.
Overview
A TrackSource can provide:
- Live objects via
LiveObjectSource - Real-time streaming from a ReadableStream
- Past objects via
PastObjectSource - Historical retrieval from an ObjectCache
- Both - Hybrid live streaming with catch-up support
TrackSource Interface
interface TrackSource {
readonly past?: PastObjectSource;
readonly live?: LiveObjectSource;
}
Optional historical object access via cache-backed range queries
Optional live object feed via streaming
At least one of past or live must be provided. Tracks can support:
- VOD/static: Only
past
- Pure live: Only
live
- Hybrid: Both
past and live
LiveObjectSource
Provides push-oriented live object feed from a ReadableStream.
Interface
interface LiveObjectSource {
readonly stream: ReadableStream<MoqtObject>;
readonly largestLocation: Location | undefined;
onNewObject(listener: (obj: MoqtObject) => void): () => void;
onDone(listener: () => void): () => void;
stop(): void;
}
Properties
stream
ReadableStream<MoqtObject>
required
Continuous stream yielding objects as they are produced
Highest (latest) location observed so far; undefined until first object arrives
Methods
onNewObject()
Registers a listener invoked for each new object.
onNewObject(listener: (obj: MoqtObject) => void): () => void
listener
(obj: MoqtObject) => void
required
Function called asynchronously for each new object
Unsubscribe function to remove the listener
onDone()
Registers a listener invoked when the live stream ends.
onDone(listener: () => void): () => void
Function called when stream ends (normal or error)
Unsubscribe function to remove the listener
stop()
Stops ingestion and releases underlying reader. Idempotent.
PastObjectSource
Provides historical object access via cache-backed range queries.
Interface
interface PastObjectSource {
readonly cache: ObjectCache;
getRange(start?: Location, end?: Location): Promise<MoqtObject[]>;
}
Properties
Underlying cache from which objects are served. See ObjectCache.
Methods
getRange()
Fetches a closed range of objects.
getRange(start?: Location, end?: Location): Promise<MoqtObject[]>
Inclusive start location. If omitted, returns from earliest cached object.
Inclusive end location. If omitted, returns up to latest cached object.
Objects in ascending location order. May be empty if requested window is outside cached range.
Implementations MUST return objects in ascending location order.
Implementations
LiveTrackSource
Concrete implementation of LiveObjectSource.
Constructor
const liveSource = new LiveTrackSource(stream: ReadableStream<MoqtObject>)
stream
ReadableStream<MoqtObject>
required
Stream of MoqtObjects to publish live
Example
import { LiveTrackSource, MoqtObject } from 'moqtail';
// Create a stream of objects (e.g., from video encoder)
const liveStream = new ReadableStream<MoqtObject>({
async start(controller) {
// Produce objects
for (let i = 0; i < 100; i++) {
const obj = MoqtObject.newWithPayload(
fullTrackName,
new Location(i, 0),
0, // priority
ObjectForwardingPreference.Subgroup,
null,
null,
new TextEncoder().encode(`Object ${i}`)
);
controller.enqueue(obj);
await sleep(33); // ~30fps
}
controller.close();
}
});
const liveSource = new LiveTrackSource(liveStream);
// Listen to new objects
const unsubscribe = liveSource.onNewObject((obj) => {
console.log('New object:', obj.groupId, obj.objectId);
});
// Listen for stream end
liveSource.onDone(() => {
console.log('Live stream ended');
unsubscribe();
});
StaticTrackSource
Concrete implementation of PastObjectSource.
Constructor
const staticSource = new StaticTrackSource(cache: ObjectCache)
ObjectCache instance containing historical objects
Example
import { StaticTrackSource, MemoryObjectCache, MoqtObject } from 'moqtail';
// Create and populate cache
const cache = new MemoryObjectCache();
for (let i = 0; i < 100; i++) {
const obj = MoqtObject.newWithPayload(
fullTrackName,
new Location(i, 0),
0,
ObjectForwardingPreference.Subgroup,
null,
null,
new TextEncoder().encode(`Cached object ${i}`)
);
cache.add(obj);
}
const staticSource = new StaticTrackSource(cache);
// Fetch range
const objects = await staticSource.getRange(
new Location(10, 0),
new Location(20, 0)
);
console.log(`Fetched ${objects.length} objects`);
HybridTrackSource
Combines both live and past sources.
Constructor
const hybridSource = new HybridTrackSource(
cache: ObjectCache,
stream: ReadableStream<MoqtObject>
)
Cache for historical objects
stream
ReadableStream<MoqtObject>
required
Live stream of new objects
Example
import { HybridTrackSource, MemoryObjectCache } from 'moqtail';
const cache = new MemoryObjectCache();
const liveStream = createLiveStream();
// Create hybrid source
const hybridSource = new HybridTrackSource(cache, liveStream);
// Cache live objects as they arrive
hybridSource.live.onNewObject((obj) => {
cache.add(obj);
});
// Use in track
client.addOrUpdateTrack({
fullTrackName,
forwardingPreference: ObjectForwardingPreference.Subgroup,
trackSource: hybridSource,
publisherPriority: 0
});
Usage Patterns
Live-Only Publishing
import { LiveTrackSource } from 'moqtail';
const liveStream = createVideoStream(); // Your stream source
const trackSource: TrackSource = {
live: new LiveTrackSource(liveStream)
};
client.addOrUpdateTrack({
fullTrackName,
forwardingPreference: ObjectForwardingPreference.Subgroup,
trackSource,
publisherPriority: 0
});
VOD/Static Content
import { StaticTrackSource, MemoryObjectCache } from 'moqtail';
const cache = new MemoryObjectCache();
// Populate cache with pre-recorded content
recording.forEach(obj => cache.add(obj));
const trackSource: TrackSource = {
past: new StaticTrackSource(cache)
};
client.addOrUpdateTrack({
fullTrackName,
forwardingPreference: ObjectForwardingPreference.Subgroup,
trackSource,
publisherPriority: 64
});
Hybrid Live + Catch-Up
import { HybridTrackSource, RingBufferObjectCache } from 'moqtail';
// Ring buffer keeps last N objects
const cache = new RingBufferObjectCache(500);
const liveStream = createLiveStream();
const hybrid = new HybridTrackSource(cache, liveStream);
// Auto-cache live objects
hybrid.live.onNewObject(obj => cache.add(obj));
const trackSource: TrackSource = hybrid;
client.addOrUpdateTrack({
fullTrackName,
forwardingPreference: ObjectForwardingPreference.Subgroup,
trackSource,
publisherPriority: 8
});
Manual TrackSource Object
Create a custom source:
const customSource: TrackSource = {
live: {
stream: liveStream,
largestLocation: undefined,
onNewObject: (listener) => { /* ... */ return () => {} },
onDone: (listener) => { /* ... */ return () => {} },
stop: () => { /* ... */ }
},
past: {
cache: myCache,
getRange: async (start, end) => {
return myCache.getRange(start, end);
}
}
};
Stream Creation Patterns
From Generator
async function* generateObjects() {
for (let i = 0; i < 1000; i++) {
yield MoqtObject.newWithPayload(
fullTrackName,
new Location(i, 0),
0,
ObjectForwardingPreference.Subgroup,
null,
null,
new TextEncoder().encode(`Frame ${i}`)
);
await sleep(33); // 30fps
}
}
const stream = ReadableStream.from(generateObjects());
const liveSource = new LiveTrackSource(stream);
From Event Source
const stream = new ReadableStream<MoqtObject>({
start(controller) {
encoder.addEventListener('frame', (event) => {
const obj = encodeToMoqtObject(event.frame);
controller.enqueue(obj);
});
encoder.addEventListener('end', () => {
controller.close();
});
},
cancel() {
encoder.stop();
}
});
const liveSource = new LiveTrackSource(stream);
const rawStream = getRawDataStream();
const objectStream = rawStream.pipeThrough(
new TransformStream<RawData, MoqtObject>({
transform(chunk, controller) {
const obj = convertToMoqtObject(chunk);
controller.enqueue(obj);
}
})
);
const liveSource = new LiveTrackSource(objectStream);
Best Practices
Cache Live Objects: For hybrid sources, cache live objects as they’re produced:hybridSource.live.onNewObject(obj => cache.add(obj));
Memory Management: Use RingBufferObjectCache for live streams to prevent unbounded memory growth:const cache = new RingBufferObjectCache(1000); // Keep last 1000 objects
Stream Cleanup: Always call stop() on LiveObjectSource when done to release resources:
See Also