Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/moqtail/moqtail/llms.txt

Use this file to discover all available pages before exploring further.

Relay

The relay module provides server-side functionality for building MOQT relay servers that forward media streams between publishers and subscribers. A relay acts as an intermediary that accepts published tracks and redistributes them to subscribed clients.
The relay module structure is defined in Moqtail but implementation is primarily through the transport and model layers.

Relay Architecture

A MOQT relay server follows this architecture:
┌─────────────────────────────────────────────┐
│         Relay Application Logic             │
│  (Accept connections, route streams)        │
└──────────┬────────────────┬─────────────────┘
           │                │
    ┌──────▼──────┐  ┌──────▼──────┐
    │ Publisher   │  │ Subscriber  │
    │ Connection  │  │ Connection  │
    └──────┬──────┘  └──────┬──────┘
           │                │
    ┌──────▼────────────────▼──────┐
    │   ControlStreamHandler        │
    │   (Setup, Publish, Subscribe) │
    └──────┬──────────────┬─────────┘
           │              │
    ┌──────▼──────┐ ┌─────▼─────────┐
    │  Receive    │ │    Forward    │
    │  Media      │ │    Media      │
    └─────────────┘ └───────────────┘

Server Setup

Creating a Server Endpoint

Set up a WebTransport/QUIC server:
use wtransport::{ServerConfig, Endpoint, Identity};
use std::net::SocketAddr;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt::init();
    
    // Create server identity (certificate)
    let server_identity = Identity::self_signed(std::iter::once("localhost"))?;
    
    // Create server configuration
    let server_config = ServerConfig::builder()
        .with_bind_address("0.0.0.0:4433".parse::<SocketAddr>()?)
        .with_identity(server_identity)
        .build();
    
    // Create server endpoint
    let server_endpoint = Endpoint::server(server_config)?;
    
    println!("MOQT relay listening on {}", server_endpoint.local_addr()?);
    
    // Accept connections
    accept_connections(server_endpoint).await?;
    
    Ok(())
}

Accepting Connections

Handle incoming connections in a loop:
use wtransport::Endpoint;

async fn accept_connections(endpoint: Endpoint) -> Result<(), Box<dyn std::error::Error>> {
    loop {
        // Accept incoming connection
        let incoming = endpoint.accept().await;
        
        // Spawn task to handle connection
        tokio::spawn(async move {
            // Await session request
            let session_request = match incoming.await {
                Ok(req) => req,
                Err(e) => {
                    eprintln!("Failed to receive session request: {:?}", e);
                    return;
                }
            };
            
            // Accept the session
            let connection = match session_request.accept().await {
                Ok(conn) => conn,
                Err(e) => {
                    eprintln!("Failed to accept session: {:?}", e);
                    return;
                }
            };
            
            println!("New connection from: {:?}", connection.remote_address());
            
            // Handle the connection
            if let Err(e) = handle_connection(connection).await {
                eprintln!("Connection error: {:?}", e);
            }
        });
    }
}

Connection Handling

Server Setup Handshake

Perform the MOQT setup handshake with clients:
use moqtail::transport::control_stream_handler::ControlStreamHandler;
use moqtail::model::control::client_setup::ClientSetup;
use moqtail::model::control::server_setup::ServerSetup;
use moqtail::model::control::control_message::ControlMessage;
use moqtail::model::control::constant::DRAFT_14;
use moqtail::model::common::pair::KeyValuePair;
use moqtail::model::error::TerminationCode;
use wtransport::Connection;
use bytes::Bytes;

async fn handle_connection(connection: Connection) -> Result<(), Box<dyn std::error::Error>> {
    // Accept control stream
    let (send_stream, recv_stream) = connection.accept_bi().await?;
    
    // Set control stream to max priority
    send_stream.set_priority(i32::MAX);
    
    let mut control_handler = ControlStreamHandler::new(send_stream, recv_stream);
    
    // Receive client setup
    match control_handler.next_message().await {
        Ok(ControlMessage::ClientSetup(client_setup)) => {
            println!("Client supports versions: {:?}", client_setup.supported_versions);
            
            // Check if we support any of the client's versions
            if !client_setup.supported_versions.contains(&DRAFT_14) {
                return Err("No compatible version".into());
            }
            
            // Send server setup response
            let server_setup = ServerSetup {
                selected_version: DRAFT_14,
                setup_parameters: vec![
                    KeyValuePair::try_new_bytes(
                        1,
                        Bytes::from_static(b"relay-server")
                    ).unwrap(),
                ],
            };
            
            control_handler
                .send(&ControlMessage::ServerSetup(Box::new(server_setup)))
                .await?;
            
            println!("Setup handshake complete");
        }
        Ok(_) => return Err("Expected ClientSetup message".into()),
        Err(e) => return Err(format!("Setup error: {:?}", e).into()),
    }
    
    // Handle client messages
    handle_client_messages(control_handler, connection).await
}

Processing Control Messages

Handle different types of control messages:
use moqtail::model::control::publish::Publish;
use moqtail::model::control::publish_namespace::PublishNamespace;
use moqtail::model::control::subscribe::Subscribe;

async fn handle_client_messages(
    mut control: ControlStreamHandler,
    connection: Connection,
) -> Result<(), Box<dyn std::error::Error>> {
    loop {
        match control.next_message().await {
            Ok(ControlMessage::PublishNamespace(publish_ns)) => {
                handle_publish_namespace(&mut control, publish_ns).await?;
            }
            Ok(ControlMessage::Publish(publish)) => {
                handle_publish(&mut control, publish).await?;
            }
            Ok(ControlMessage::Subscribe(subscribe)) => {
                handle_subscribe(&mut control, subscribe).await?;
            }
            Ok(ControlMessage::Unsubscribe(unsub)) => {
                handle_unsubscribe(&mut control, unsub).await?;
            }
            Ok(msg) => {
                println!("Received control message: {:?}", msg.get_type());
            }
            Err(TerminationCode::NoError) => {
                println!("Client disconnected cleanly");
                break;
            }
            Err(e) => {
                eprintln!("Control stream error: {:?}", e);
                break;
            }
        }
    }
    
    Ok(())
}

Publishing

Handling Publish Namespace

Process namespace publication announcements:
use moqtail::model::control::publish_namespace::PublishNamespace;
use moqtail::model::control::publish_namespace_ok::PublishNamespaceOk;
use moqtail::model::control::publish_namespace_error::PublishNamespaceError;
use moqtail::model::control::constant::PublishNamespaceErrorCode;
use moqtail::model::common::reason_phrase::ReasonPhrase;

async fn handle_publish_namespace(
    control: &mut ControlStreamHandler,
    publish_ns: Box<PublishNamespace>,
) -> Result<(), Box<dyn std::error::Error>> {
    println!("Publish namespace request: {:?}", publish_ns.track_namespace);
    
    // Validate namespace (implement your authorization logic)
    let authorized = authorize_namespace(&publish_ns.track_namespace).await;
    
    if authorized {
        // Accept the publication
        let response = PublishNamespaceOk {
            request_id: publish_ns.request_id,
        };
        
        control
            .send(&ControlMessage::PublishNamespaceOk(Box::new(response)))
            .await?;
        
        println!("Accepted namespace publication");
    } else {
        // Reject the publication
        let response = PublishNamespaceError {
            request_id: publish_ns.request_id,
            error_code: PublishNamespaceErrorCode::Unauthorized,
            reason_phrase: ReasonPhrase::try_new("Unauthorized namespace".to_string())?,
        };
        
        control
            .send(&ControlMessage::PublishNamespaceError(Box::new(response)))
            .await?;
    }
    
    Ok(())
}

async fn authorize_namespace(namespace: &Tuple) -> bool {
    // Implement your authorization logic
    // For example, check against allowed namespaces
    true
}

Handling Track Publication

Process individual track publications:
use moqtail::model::control::publish::Publish;
use moqtail::model::control::publish_ok::PublishOk;
use moqtail::model::control::publish_error::PublishError;
use moqtail::model::control::constant::PublishErrorCode;

async fn handle_publish(
    control: &mut ControlStreamHandler,
    publish: Box<Publish>,
) -> Result<(), Box<dyn std::error::Error>> {
    println!("Publish track: {}", publish.get_full_track_name());
    
    // Register the track for forwarding
    let track_alias = register_track(publish.get_full_track_name()).await?;
    
    // Send publish OK with track alias
    let response = PublishOk {
        request_id: publish.request_id,
        track_alias,
        expires: 3600, // 1 hour
    };
    
    control
        .send(&ControlMessage::PublishOk(Box::new(response)))
        .await?;
    
    Ok(())
}

async fn register_track(
    track_name: FullTrackName,
) -> Result<u64, Box<dyn std::error::Error>> {
    // Implement track registration logic
    // Assign a unique track alias for this connection
    Ok(1)
}

Subscription Management

Handling Subscribe Requests

Process subscription requests from clients:
use moqtail::model::control::subscribe::Subscribe;
use moqtail::model::control::subscribe_ok::SubscribeOk;
use moqtail::model::control::subscribe_error::SubscribeError;
use moqtail::model::control::constant::{GroupOrder, SubscribeErrorCode};
use moqtail::model::common::location::Location;

async fn handle_subscribe(
    control: &mut ControlStreamHandler,
    subscribe: Box<Subscribe>,
) -> Result<(), Box<dyn std::error::Error>> {
    let track_name = subscribe.get_full_track_name();
    println!("Subscribe request for: {}", track_name);
    
    // Check if track exists
    if let Some(track_info) = find_track(&track_name).await {
        // Assign track alias for this subscription
        let track_alias = assign_track_alias(&track_name).await?;
        
        // Send subscribe OK
        let response = SubscribeOk {
            request_id: subscribe.request_id,
            track_alias,
            expires: 3600,
            group_order: GroupOrder::Ascending,
            content_exists: true,
            largest_location: Some(Location {
                group: track_info.latest_group,
                object: track_info.latest_object,
            }),
            subscribe_parameters: None,
        };
        
        control
            .send(&ControlMessage::SubscribeOk(Box::new(response)))
            .await?;
        
        // Start forwarding data to this subscriber
        start_forwarding(track_alias, subscribe).await?;
    } else {
        // Track not found
        let response = SubscribeError {
            request_id: subscribe.request_id,
            error_code: SubscribeErrorCode::TrackNotFound,
            reason_phrase: ReasonPhrase::try_new("Track not available".to_string())?,
            track_alias: 0,
        };
        
        control
            .send(&ControlMessage::SubscribeError(Box::new(response)))
            .await?;
    }
    
    Ok(())
}

struct TrackInfo {
    latest_group: u64,
    latest_object: u64,
}

async fn find_track(track_name: &FullTrackName) -> Option<TrackInfo> {
    // Implement track lookup logic
    Some(TrackInfo {
        latest_group: 42,
        latest_object: 10,
    })
}

async fn assign_track_alias(
    track_name: &FullTrackName,
) -> Result<u64, Box<dyn std::error::Error>> {
    // Assign unique track alias
    Ok(1)
}

async fn start_forwarding(
    track_alias: u64,
    subscribe: Box<Subscribe>,
) -> Result<(), Box<dyn std::error::Error>> {
    // Implement data forwarding logic
    Ok(())
}

Handling Unsubscribe

Clean up subscriptions:
use moqtail::model::control::unsubscribe::Unsubscribe;

async fn handle_unsubscribe(
    control: &mut ControlStreamHandler,
    unsub: Box<Unsubscribe>,
) -> Result<(), Box<dyn std::error::Error>> {
    println!("Unsubscribe from track alias: {}", unsub.track_alias);
    
    // Stop forwarding to this subscriber
    stop_forwarding(unsub.track_alias).await?;
    
    Ok(())
}

async fn stop_forwarding(track_alias: u64) -> Result<(), Box<dyn std::error::Error>> {
    // Implement forwarding cleanup
    Ok(())
}

State Management

Track Registry

Maintain a registry of active tracks:
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use moqtail::model::data::full_track_name::FullTrackName;
use moqtail::model::data::track::Track;

struct RelayState {
    tracks: Arc<RwLock<HashMap<FullTrackName, Track>>>,
    publishers: Arc<RwLock<HashMap<u64, Connection>>>,
    subscribers: Arc<RwLock<HashMap<u64, Vec<Connection>>>>,
}

impl RelayState {
    fn new() -> Self {
        Self {
            tracks: Arc::new(RwLock::new(HashMap::new())),
            publishers: Arc::new(RwLock::new(HashMap::new())),
            subscribers: Arc::new(RwLock::new(HashMap::new())),
        }
    }
    
    async fn add_track(&self, track: Track) {
        let mut tracks = self.tracks.write().await;
        tracks.insert(track.full_track_name.clone(), track);
    }
    
    async fn get_track(&self, name: &FullTrackName) -> Option<Track> {
        let tracks = self.tracks.read().await;
        tracks.get(name).cloned()
    }
    
    async fn add_subscriber(&self, track_alias: u64, conn: Connection) {
        let mut subs = self.subscribers.write().await;
        subs.entry(track_alias).or_insert_with(Vec::new).push(conn);
    }
}

Complete Relay Example

Here’s a simplified complete relay implementation:
use moqtail::model::control::control_message::ControlMessage;
use moqtail::model::control::constant::DRAFT_14;
use moqtail::model::error::TerminationCode;
use moqtail::transport::control_stream_handler::ControlStreamHandler;
use std::sync::Arc;
use tokio::sync::RwLock;
use wtransport::{Endpoint, Identity, ServerConfig};

struct Relay {
    state: Arc<RelayState>,
}

impl Relay {
    fn new() -> Self {
        Self {
            state: Arc::new(RelayState::new()),
        }
    }
    
    async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
        let identity = Identity::self_signed(std::iter::once("localhost"))?;
        let config = ServerConfig::builder()
            .with_bind_address("0.0.0.0:4433".parse()?)
            .with_identity(identity)
            .build();
        
        let endpoint = Endpoint::server(config)?;
        println!("Relay listening on {}", endpoint.local_addr()?);
        
        loop {
            let incoming = endpoint.accept().await;
            let state = Arc::clone(&self.state);
            
            tokio::spawn(async move {
                let session_request = incoming.await.unwrap();
                let connection = session_request.accept().await.unwrap();
                let (send, recv) = connection.accept_bi().await.unwrap();
                let mut control = ControlStreamHandler::new(send, recv);
                
                // Handle setup and messages
                handle_client(&mut control, state).await.ok();
            });
        }
    }
}

async fn handle_client(
    control: &mut ControlStreamHandler,
    state: Arc<RelayState>,
) -> Result<(), Box<dyn std::error::Error>> {
    // Setup handshake (omitted for brevity)
    
    // Message loop
    loop {
        match control.next_message().await {
            Ok(msg) => { /* handle messages */ }
            Err(TerminationCode::NoError) => break,
            Err(_) => break,
        }
    }
    
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let relay = Relay::new();
    relay.run().await
}

Next Steps

Transport Layer

Deep dive into control and data stream handlers

Client Implementation

Build clients that connect to your relay

Protocol Model

Understand MOQT messages and data structures