Documentation Index Fetch the complete documentation index at: https://mintlify.com/moqtail/moqtail/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Content sources provide objects to tracks. MOQtail supports three types of content sources:
Live sources : Real-time streaming content
Static sources : Pre-recorded or cached content
Hybrid sources : Combination of live and cached content
The TrackSource interface allows tracks to support live, cached, or both types of content simultaneously.
Content Source Architecture
export interface TrackSource {
/** Historical object access (optional) */
readonly past ?: PastObjectSource
/** Live object feed (optional) */
readonly live ?: LiveObjectSource
}
At least one of past or live must be present for a valid track.
Live Content Sources
Live sources provide real-time objects as they are created.
LiveObjectSource Interface
export interface LiveObjectSource {
/** Continuous stream yielding objects as they are produced */
readonly stream : ReadableStream < MoqtObject >
/** Highest (latest) location observed so far; undefined until first object */
readonly largestLocation : Location | undefined
/** Register a listener invoked for each new object */
onNewObject ( listener : ( obj : MoqtObject ) => void ) : () => void
/** Register a listener invoked when stream ends */
onDone ( listener : () => void ) : () => void
/** Stop ingestion and release reader */
stop () : void
}
Creating Live Sources
import { LiveTrackSource , MoqtObject , Location } from 'moqtail'
// Create ReadableStream of objects
const liveStream = new ReadableStream < MoqtObject >({
async start ( controller ) {
let groupId = 0 n
let objectId = 0 n
// Simulate video capture at 30fps
const interval = setInterval (() => {
try {
const frame = await captureVideoFrame ()
const obj = MoqtObject . newWithPayload (
fullTrackName ,
new Location ( groupId , objectId ++ ),
128 ,
ObjectForwardingPreference . Subgroup ,
0 n ,
null ,
frame
)
controller . enqueue ( obj )
// New GOP every 30 frames
if ( objectId === 30 n ) {
groupId ++
objectId = 0 n
}
} catch ( error ) {
controller . error ( error )
}
}, 33 ) // ~30fps
// Cleanup on cancel
return () => clearInterval ( interval )
}
})
// Wrap in LiveTrackSource
const liveSource = new LiveTrackSource ( liveStream )
// Use in track
const track : Track = {
fullTrackName ,
forwardingPreference: ObjectForwardingPreference . Subgroup ,
trackSource: { live: liveSource },
publisherPriority: 0
}
Live Source Implementation
The LiveTrackSource class handles stream ingestion:
export class LiveTrackSource implements LiveObjectSource {
readonly stream : ReadableStream < MoqtObject >
readonly #listeners = new Set <( obj : MoqtObject ) => void >()
readonly #doneListeners = new Set <() => void >()
#largestLocation : Location | undefined
#reader ?: ReadableStreamDefaultReader < MoqtObject >
constructor ( stream : ReadableStream < MoqtObject >) {
this . stream = stream
this . #startIngest ()
}
get largestLocation () : Location | undefined {
return this . #largestLocation
}
onNewObject ( listener : ( obj : MoqtObject ) => void ) : () => void {
this . #listeners . add ( listener )
return () => this . #listeners . delete ( listener )
}
onDone ( listener : () => void ) : () => void {
this . #doneListeners . add ( listener )
return () => this . #doneListeners . delete ( listener )
}
stop () : void {
this . #ingestActive = false
this . #reader ?. cancel ()
}
}
How live source ingestion works
Stream reader consumes objects from the ReadableStream
Location tracking maintains the largest location seen
Listener notification invokes registered callbacks for each object
Done handling notifies listeners when stream ends
Async delivery ensures listeners don’t block ingestion
Live Source Patterns
function createCameraStream ( fullTrackName : FullTrackName ) : LiveTrackSource {
const stream = new ReadableStream < MoqtObject >({
async start ( controller ) {
const mediaStream = await navigator . mediaDevices . getUserMedia ({
video: { width: 1280 , height: 720 }
})
const track = mediaStream . getVideoTracks ()[ 0 ]
const processor = new MediaStreamTrackProcessor ({ track })
const reader = processor . readable . getReader ()
let groupId = 0 n
let objectId = 0 n
while ( true ) {
const { done , value : frame } = await reader . read ()
if ( done ) break
const encoded = await encodeFrame ( frame )
const obj = MoqtObject . newWithPayload (
fullTrackName ,
new Location ( groupId , objectId ++ ),
128 ,
ObjectForwardingPreference . Subgroup ,
0 n ,
null ,
encoded
)
controller . enqueue ( obj )
if ( isKeyFrame ( frame )) {
groupId ++
objectId = 0 n
}
}
}
})
return new LiveTrackSource ( stream )
}
Sensor Data
function createSensorStream ( fullTrackName : FullTrackName ) : LiveTrackSource {
const stream = new ReadableStream < MoqtObject >({
start ( controller ) {
let groupId = 0 n
let objectId = 0 n
// Simulate sensor readings
setInterval (() => {
const reading = {
temperature: Math . random () * 30 + 10 ,
humidity: Math . random () * 100 ,
timestamp: Date . now ()
}
const payload = new TextEncoder (). encode ( JSON . stringify ( reading ))
const obj = MoqtObject . newWithPayload (
fullTrackName ,
new Location ( groupId , objectId ++ ),
128 ,
ObjectForwardingPreference . Datagram , // Low latency
null ,
null ,
payload
)
controller . enqueue ( obj )
// New group every 100 readings
if ( objectId === 100 n ) {
groupId ++
objectId = 0 n
}
}, 100 ) // 10Hz
}
})
return new LiveTrackSource ( stream )
}
Static Content Sources
Static sources provide cached or pre-recorded content.
PastObjectSource Interface
export interface PastObjectSource {
/** Underlying cache from which objects are served */
readonly cache : ObjectCache
/**
* Fetch a range of objects. Start/end are inclusive.
* Omitted bounds mean from earliest or to latest cached.
*/
getRange ( start ?: Location , end ?: Location ) : Promise < MoqtObject []>
}
Object Caches
Two cache implementations are provided:
MemoryObjectCache
Unlimited in-memory storage with binary search:
export class MemoryObjectCache implements ObjectCache {
private objects : MoqtObject [] = []
add ( obj : MoqtObject ) : void {
const idx = this . _findInsertIndex ( obj )
this . objects . splice ( idx , 0 , obj )
}
getRange ( start ?: Location , end ?: Location ) : MoqtObject [] {
const startIdx = start ? this . _findIndex ( start , false ) : 0
const endIdx = end ? this . _findIndex ( end , true ) : this . objects . length
return this . objects . slice ( startIdx , endIdx )
}
getByLocation ( location : Location ) : MoqtObject | undefined {
const idx = this . _findIndex ( location , false )
const obj = this . objects [ idx ]
if ( obj && obj . groupId === location . group && obj . objectId === location . object ) {
return obj
}
return undefined
}
size () : number {
return this . objects . length
}
clear () : void {
this . objects = []
}
}
MemoryObjectCache maintains objects in sorted order by location, enabling efficient range queries via binary search.
RingBufferObjectCache
Fixed-size cache with automatic eviction:
export class RingBufferObjectCache implements ObjectCache {
private buffer : MoqtObject []
private maxSize : number
constructor ( maxSize : number = 100 ) {
this . buffer = []
this . maxSize = maxSize
}
add ( obj : MoqtObject ) : void {
const idx = this . _findInsertIndex ( obj )
this . buffer . splice ( idx , 0 , obj )
// Evict oldest if over capacity
if ( this . buffer . length > this . maxSize ) {
this . buffer . shift ()
}
}
// ... similar methods to MemoryObjectCache
}
Creating Static Sources
import { StaticTrackSource , MemoryObjectCache } from 'moqtail'
// Create cache
const cache = new MemoryObjectCache ()
// Populate with pre-recorded content
for ( let g = 0 n ; g < 100 n ; g ++ ) {
for ( let o = 0 n ; o < 30 n ; o ++ ) {
const obj = MoqtObject . newWithPayload (
fullTrackName ,
new Location ( g , o ),
128 ,
ObjectForwardingPreference . Subgroup ,
0 n ,
null ,
loadFrameData ( g , o )
)
cache . add ( obj )
}
}
// Create static source
const staticSource = new StaticTrackSource ( cache )
// Use in track
const vodTrack : Track = {
fullTrackName: FullTrackName . tryNew ( 'vod/content' , 'movie' ),
forwardingPreference: ObjectForwardingPreference . Subgroup ,
trackSource: { past: staticSource },
publisherPriority: 64
}
Static Source Implementation
export class StaticTrackSource implements PastObjectSource {
readonly cache : ObjectCache
constructor ( cache : ObjectCache ) {
this . cache = cache
}
async getRange ( start ?: Location , end ?: Location ) : Promise < MoqtObject []> {
return this . cache . getRange ( start , end )
}
}
Static Source Patterns
VOD Content
async function loadVODTrack ( filePath : string ) : Promise < Track > {
const cache = new MemoryObjectCache ()
const file = await loadMediaFile ( filePath )
// Parse and cache segments
const segments = parseMediaSegments ( file )
for ( const [ groupId , segment ] of segments . entries ()) {
const frames = extractFrames ( segment )
for ( const [ objectId , frame ] of frames . entries ()) {
const obj = MoqtObject . newWithPayload (
fullTrackName ,
new Location ( BigInt ( groupId ), BigInt ( objectId )),
128 ,
ObjectForwardingPreference . Subgroup ,
0 n ,
null ,
frame . data
)
cache . add ( obj )
}
}
return {
fullTrackName: FullTrackName . tryNew ( 'vod/movies' , path . basename ( filePath )),
forwardingPreference: ObjectForwardingPreference . Subgroup ,
trackSource: { past: new StaticTrackSource ( cache ) },
publisherPriority: 64
}
}
File Transfer
function createFileTransferTrack ( file : File ) : Track {
const cache = new MemoryObjectCache ()
const chunkSize = 64 * 1024 // 64KB chunks
let groupId = 0 n
let objectId = 0 n
for ( let offset = 0 ; offset < file . size ; offset += chunkSize ) {
const chunk = file . slice ( offset , offset + chunkSize )
const obj = MoqtObject . newWithPayload (
FullTrackName . tryNew ( 'files/transfer' , file . name ),
new Location ( groupId , objectId ++ ),
128 ,
ObjectForwardingPreference . Subgroup ,
0 n ,
null ,
new Uint8Array ( await chunk . arrayBuffer ())
)
cache . add ( obj )
// New group every 10MB
if ( offset % ( 10 * 1024 * 1024 ) === 0 ) {
groupId ++
objectId = 0 n
}
}
return {
fullTrackName: FullTrackName . tryNew ( 'files/transfer' , file . name ),
forwardingPreference: ObjectForwardingPreference . Subgroup ,
trackSource: { past: new StaticTrackSource ( cache ) },
publisherPriority: 128
}
}
Hybrid Content Sources
Hybrid sources combine live and cached content for catch-up scenarios.
HybridTrackSource Implementation
export class HybridTrackSource implements TrackSource {
readonly past : PastObjectSource
readonly live : LiveObjectSource
constructor ( cache : ObjectCache , stream : ReadableStream < MoqtObject >) {
this . past = new StaticTrackSource ( cache )
this . live = new LiveTrackSource ( stream )
}
}
Creating Hybrid Sources
import { HybridTrackSource , RingBufferObjectCache } from 'moqtail'
// Create ring buffer for recent history
const cache = new RingBufferObjectCache ( 300 ) // Last 10 seconds @ 30fps
// Create live stream that also populates cache
const liveStream = new ReadableStream < MoqtObject >({
start ( controller ) {
let groupId = 0 n
let objectId = 0 n
setInterval (() => {
const frame = captureFrame ()
const obj = MoqtObject . newWithPayload (
fullTrackName ,
new Location ( groupId , objectId ++ ),
128 ,
ObjectForwardingPreference . Subgroup ,
0 n ,
null ,
frame
)
// Add to cache AND stream
cache . add ( obj )
controller . enqueue ( obj )
if ( objectId === 30 n ) {
groupId ++
objectId = 0 n
}
}, 33 )
}
})
// Create hybrid source
const hybridSource = new HybridTrackSource ( cache , liveStream )
// Use in track
const track : Track = {
fullTrackName: FullTrackName . tryNew ( 'live/stream' , 'video' ),
forwardingPreference: ObjectForwardingPreference . Subgroup ,
trackSource: {
past: hybridSource . past ,
live: hybridSource . live
},
publisherPriority: 0
}
Hybrid Source Patterns
Catch-up TV
function createCatchUpStream () : HybridTrackSource {
// Cache last 2 hours
const cache = new RingBufferObjectCache ( 216000 ) // 2hrs @ 30fps
const liveStream = new ReadableStream < MoqtObject >({
async start ( controller ) {
const mediaStream = await getLiveBroadcast ()
// ... encode and stream frames
// Add each frame to cache as it's streamed
}
})
return new HybridTrackSource ( cache , liveStream )
}
// Subscribers can:
// 1. Fetch recent history: await client.fetch(...)
// 2. Then subscribe to live: await client.subscribe(...)
DVR Functionality
function createDVRTrack () : Track {
const cache = new MemoryObjectCache () // Unlimited history
const liveStream = new ReadableStream < MoqtObject >({
start ( controller ) {
// Stream live content while building cache
setInterval (() => {
const obj = captureLiveFrame ()
cache . add ( obj )
controller . enqueue ( obj )
}, 33 )
}
})
return {
fullTrackName: FullTrackName . tryNew ( 'live/dvr' , 'stream' ),
forwardingPreference: ObjectForwardingPreference . Subgroup ,
trackSource: new HybridTrackSource ( cache , liveStream ),
publisherPriority: 0
}
}
Choosing the right cache size
Small cache (RingBufferObjectCache with 100-500 objects):
Short catch-up window (few seconds)
Low memory footprint
Good for: Real-time communication, live sports
Medium cache (RingBufferObjectCache with 5000-50000 objects):
Medium catch-up window (minutes to hours)
Reasonable memory usage
Good for: Live events, news, broadcasts
Large cache (MemoryObjectCache, unlimited):
Full history available
High memory usage
Good for: DVR, replay, complete archives
Content Source Lifecycle
Publisher Side
// 1. Create content source
const liveSource = new LiveTrackSource ( liveStream )
// 2. Register listeners (optional)
const unsubscribe = liveSource . onNewObject (( obj ) => {
console . log ( 'New object:' , obj . location )
})
// 3. Create track with source
const track : Track = {
fullTrackName ,
forwardingPreference: ObjectForwardingPreference . Subgroup ,
trackSource: { live: liveSource },
publisherPriority: 0
}
// 4. Add to client
client . addOrUpdateTrack ( track )
// 5. Cleanup when done
unsubscribe ()
liveSource . stop ()
client . removeTrack ( track )
Subscriber Side
Subscribers don’t interact with content sources directly. They receive objects through subscriptions and fetches managed by the MOQtailClient.
Learn More Explore the complete Track API reference