Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/moqtail/moqtail/llms.txt

Use this file to discover all available pages before exploring further.

Client

The client module provides functionality for building MOQT client applications that can connect to relays and consume media streams. While the module structure is defined, most client-side functionality is implemented through the transport and model layers.
The client module is currently minimal in the Moqtail library. Client applications primarily use the transport handlers and protocol models directly.

Client Architecture

A typical MOQT client application follows this architecture:
┌─────────────────────────────────┐
│   Client Application Logic      │
│  (Subscribe, consume media)     │
└────────────┬────────────────────┘

┌────────────▼────────────────────┐
│  ControlStreamHandler           │
│  (Setup, Subscribe, etc.)       │
└────────────┬────────────────────┘

┌────────────▼────────────────────┐
│  DataStreamHandler              │
│  (Receive media objects)        │
└────────────┬────────────────────┘

┌────────────▼────────────────────┐
│    wtransport (QUIC)            │
│  (Network transport)            │
└─────────────────────────────────┘

Basic Client Setup

Establishing a Connection

Create a QUIC connection using wtransport:
use wtransport::{ClientConfig, Endpoint};
use wtransport::endpoint::IntoConnectOptions;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create client configuration
    let client_config = ClientConfig::builder()
        .with_bind_default()
        .with_server_certificate_hashes(vec![server_cert_hash])
        .build();
    
    // Create client endpoint
    let client_endpoint = Endpoint::client(client_config)?;
    
    // Connect to server
    let connection = client_endpoint
        .connect("https://relay.example.com:4433".into_options())
        .await?;
    
    println!("Connected to relay server");
    
    Ok(())
}

Opening Control Stream

After establishing a connection, open a bidirectional control stream:
use moqtail::transport::control_stream_handler::ControlStreamHandler;

// Open bidirectional stream for control messages
let (send_stream, recv_stream) = connection
    .open_bi()
    .await?
    .await?;

// Create control stream handler
let mut control_handler = ControlStreamHandler::new(send_stream, recv_stream);

Client Setup Handshake

Sending Client Setup

Initiate the MOQT handshake by sending a ClientSetup message:
use moqtail::model::control::client_setup::ClientSetup;
use moqtail::model::control::constant::DRAFT_14;
use moqtail::model::control::control_message::ControlMessage;
use moqtail::model::common::pair::KeyValuePair;
use bytes::Bytes;

// Create client setup message
let supported_versions = vec![DRAFT_14];
let setup_parameters = vec![
    KeyValuePair::try_new_varint(0, 10).unwrap(),
    KeyValuePair::try_new_bytes(1, Bytes::from_static(b"client-role")).unwrap(),
];

let client_setup = ClientSetup::new(supported_versions, setup_parameters);
let setup_msg = ControlMessage::ClientSetup(Box::new(client_setup));

// Send setup message
control_handler.send(&setup_msg).await?;

Receiving Server Setup

Wait for the server’s setup response:
use moqtail::model::control::server_setup::ServerSetup;
use moqtail::model::error::TerminationCode;

// Receive server setup message
match control_handler.next_message().await {
    Ok(ControlMessage::ServerSetup(server_setup)) => {
        println!("Server selected version: {}", server_setup.selected_version);
        
        if server_setup.selected_version != DRAFT_14 {
            return Err("Unsupported version".into());
        }
    }
    Ok(_) => return Err("Expected ServerSetup message".into()),
    Err(TerminationCode::ProtocolViolation) => {
        return Err("Protocol violation during setup".into());
    }
    Err(e) => return Err(format!("Setup failed: {:?}", e).into()),
}

Subscribing to Tracks

Creating a Subscribe Request

Subscribe to a specific track to receive media:
use moqtail::model::control::subscribe::Subscribe;
use moqtail::model::control::constant::{FilterType, GroupOrder};
use moqtail::model::common::tuple::{Tuple, TupleField};
use moqtail::model::common::location::Location;

// Create subscribe message for latest content
let subscribe = Subscribe::new_latest_object(
    request_id: 1,
    track_namespace: Tuple::from_utf8_path("conference/room42"),
    track_name: TupleField::from_utf8("video"),
    subscriber_priority: 128,
    group_order: GroupOrder::Ascending,
    forward: true,
    subscribe_parameters: vec![],
);

let subscribe_msg = ControlMessage::Subscribe(Box::new(subscribe));
control_handler.send(&subscribe_msg).await?;

Subscribe with Specific Range

Subscribe to a specific range of content:
// Subscribe to specific range of groups
let start_location = Location {
    group: 100,
    object: 0,
};

let subscribe = Subscribe::new_absolute_range(
    request_id: 2,
    track_namespace: Tuple::from_utf8_path("conference/room42"),
    track_name: TupleField::from_utf8("audio"),
    subscriber_priority: 128,
    group_order: GroupOrder::Ascending,
    forward: true,
    start_location,
    end_group: 200,
    subscribe_parameters: vec![],
);

let subscribe_msg = ControlMessage::Subscribe(Box::new(subscribe));
control_handler.send(&subscribe_msg).await?;

Handling Subscribe Response

Process the subscription acknowledgment:
use moqtail::model::control::subscribe_ok::SubscribeOk;
use moqtail::model::control::subscribe_error::SubscribeError;

match control_handler.next_message().await? {
    ControlMessage::SubscribeOk(sub_ok) => {
        println!("Subscription successful!");
        println!("Track alias: {}", sub_ok.track_alias);
        println!("Expires in: {} seconds", sub_ok.expires);
        println!("Group order: {:?}", sub_ok.group_order);
        
        if sub_ok.content_exists {
            if let Some(largest) = sub_ok.largest_location {
                println!("Latest content at group {}, object {}", 
                    largest.group, largest.object);
            }
        }
    }
    ControlMessage::SubscribeError(sub_err) => {
        eprintln!("Subscription failed: {}", sub_err.reason_phrase);
        return Err("Subscribe error".into());
    }
    _ => return Err("Unexpected message type".into()),
}

Receiving Media Data

Processing Control Messages

Continuously process control messages in a loop:
use moqtail::model::control::track_status::TrackStatus;
use moqtail::model::control::subscribe_update::SubscribeUpdate;

tokio::spawn(async move {
    loop {
        match control_handler.next_message().await {
            Ok(ControlMessage::TrackStatus(status)) => {
                println!("Track status update received");
                // Handle track status
            }
            Ok(ControlMessage::SubscribeUpdate(update)) => {
                println!("Subscribe parameters updated");
                // Handle subscription update
            }
            Ok(msg) => {
                println!("Received control message: {:?}", msg.get_type());
            }
            Err(TerminationCode::NoError) => {
                println!("Connection closed cleanly");
                break;
            }
            Err(e) => {
                eprintln!("Control stream error: {:?}", e);
                break;
            }
        }
    }
});

Reading Data Streams

Accept and process incoming data streams:
use moqtail::transport::data_stream_handler::DataStreamHandler;

// Accept incoming unidirectional streams for data
loop {
    match connection.accept_uni().await {
        Ok(recv_stream) => {
            tokio::spawn(async move {
                let mut buffer = vec![0u8; 4096];
                
                loop {
                    match recv_stream.read(&mut buffer).await {
                        Ok(Some(n)) => {
                            println!("Received {} bytes of media data", n);
                            // Process media data
                        }
                        Ok(None) => {
                            println!("Stream closed");
                            break;
                        }
                        Err(e) => {
                            eprintln!("Stream error: {:?}", e);
                            break;
                        }
                    }
                }
            });
        }
        Err(e) => {
            eprintln!("Error accepting stream: {:?}", e);
            break;
        }
    }
}

Complete Client Example

Here’s a complete working client example:
use moqtail::model::common::pair::KeyValuePair;
use moqtail::model::common::tuple::{Tuple, TupleField};
use moqtail::model::control::client_setup::ClientSetup;
use moqtail::model::control::constant::{DRAFT_14, GroupOrder};
use moqtail::model::control::control_message::ControlMessage;
use moqtail::model::control::subscribe::Subscribe;
use moqtail::model::error::TerminationCode;
use moqtail::transport::control_stream_handler::ControlStreamHandler;
use wtransport::{ClientConfig, Endpoint};
use wtransport::endpoint::IntoConnectOptions;
use bytes::Bytes;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt::init();
    
    // Connect to relay
    let client_config = ClientConfig::builder()
        .with_bind_default()
        .with_server_certificate_hashes(vec![server_cert_hash])
        .build();
    
    let endpoint = Endpoint::client(client_config)?;
    let connection = endpoint
        .connect("https://relay.example.com:4433".into_options())
        .await?;
    
    // Setup control stream
    let (send, recv) = connection.open_bi().await?.await?;
    let mut control = ControlStreamHandler::new(send, recv);
    
    // Send client setup
    let client_setup = ClientSetup::new(
        vec![DRAFT_14],
        vec![KeyValuePair::try_new_bytes(1, Bytes::from_static(b"subscriber")).unwrap()],
    );
    control.send(&ControlMessage::ClientSetup(Box::new(client_setup))).await?;
    
    // Receive server setup
    match control.next_message().await? {
        ControlMessage::ServerSetup(setup) => {
            println!("Connected with version {}", setup.selected_version);
        }
        _ => return Err("Unexpected setup response".into()),
    }
    
    // Subscribe to track
    let subscribe = Subscribe::new_latest_object(
        1,
        Tuple::from_utf8_path("conference/main"),
        TupleField::from_utf8("video"),
        128,
        GroupOrder::Ascending,
        true,
        vec![],
    );
    control.send(&ControlMessage::Subscribe(Box::new(subscribe))).await?;
    
    // Wait for subscribe OK
    match control.next_message().await? {
        ControlMessage::SubscribeOk(ok) => {
            println!("Subscribed with track alias {}", ok.track_alias);
        }
        _ => return Err("Subscribe failed".into()),
    }
    
    // Process messages
    loop {
        match control.next_message().await {
            Ok(msg) => println!("Received: {:?}", msg.get_type()),
            Err(TerminationCode::NoError) => break,
            Err(e) => {
                eprintln!("Error: {:?}", e);
                break;
            }
        }
    }
    
    Ok(())
}

Error Handling

Timeout Handling

Handle control message timeouts:
match control_handler.next_message().await {
    Err(TerminationCode::ControlMessageTimeout) => {
        eprintln!("Control message timeout - partial message not completed");
        // Reconnect or terminate
    }
    Err(TerminationCode::DataStreamTimeout) => {
        eprintln!("Data stream timeout");
        // Handle timeout
    }
    Err(e) => eprintln!("Error: {:?}", e),
    Ok(msg) => { /* process message */ }
}

Connection Recovery

Implement reconnection logic:
async fn connect_with_retry(
    endpoint: &Endpoint,
    url: &str,
    max_retries: u32,
) -> Result<Connection, Box<dyn std::error::Error>> {
    let mut retries = 0;
    
    loop {
        match endpoint.connect(url.into_options()).await {
            Ok(conn) => return Ok(conn),
            Err(e) if retries < max_retries => {
                retries += 1;
                eprintln!("Connection failed (attempt {}): {:?}", retries, e);
                tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
            }
            Err(e) => return Err(e.into()),
        }
    }
}

Advanced Topics

Managing Multiple Subscriptions

Track multiple active subscriptions:
use std::collections::HashMap;

struct SubscriptionManager {
    active_subs: HashMap<u64, String>,
    next_request_id: u64,
}

impl SubscriptionManager {
    fn new() -> Self {
        Self {
            active_subs: HashMap::new(),
            next_request_id: 1,
        }
    }
    
    async fn subscribe(
        &mut self,
        control: &mut ControlStreamHandler,
        namespace: &str,
        name: &str,
    ) -> Result<u64, Box<dyn std::error::Error>> {
        let request_id = self.next_request_id;
        self.next_request_id += 1;
        
        let subscribe = Subscribe::new_latest_object(
            request_id,
            Tuple::from_utf8_path(namespace),
            TupleField::from_utf8(name),
            128,
            GroupOrder::Ascending,
            true,
            vec![],
        );
        
        control.send(&ControlMessage::Subscribe(Box::new(subscribe))).await?;
        self.active_subs.insert(request_id, format!("{}/{}", namespace, name));
        
        Ok(request_id)
    }
}

Next Steps

Transport Layer

Deep dive into control and data stream handlers

Protocol Model

Explore MOQT message types and data structures

Relay Server

Build a relay server to forward streams