Documentation Index
Fetch the complete documentation index at: https://mintlify.com/moqtail/moqtail/llms.txt
Use this file to discover all available pages before exploring further.
Client
The client module provides functionality for building MOQT client applications that can connect to relays and consume media streams. While the module structure is defined, most client-side functionality is implemented through the transport and model layers.The client module is currently minimal in the Moqtail library. Client applications primarily use the transport handlers and protocol models directly.
Client Architecture
A typical MOQT client application follows this architecture:┌─────────────────────────────────┐
│ Client Application Logic │
│ (Subscribe, consume media) │
└────────────┬────────────────────┘
│
┌────────────▼────────────────────┐
│ ControlStreamHandler │
│ (Setup, Subscribe, etc.) │
└────────────┬────────────────────┘
│
┌────────────▼────────────────────┐
│ DataStreamHandler │
│ (Receive media objects) │
└────────────┬────────────────────┘
│
┌────────────▼────────────────────┐
│ wtransport (QUIC) │
│ (Network transport) │
└─────────────────────────────────┘
Basic Client Setup
Establishing a Connection
Create a QUIC connection using wtransport:use wtransport::{ClientConfig, Endpoint};
use wtransport::endpoint::IntoConnectOptions;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client configuration
let client_config = ClientConfig::builder()
.with_bind_default()
.with_server_certificate_hashes(vec![server_cert_hash])
.build();
// Create client endpoint
let client_endpoint = Endpoint::client(client_config)?;
// Connect to server
let connection = client_endpoint
.connect("https://relay.example.com:4433".into_options())
.await?;
println!("Connected to relay server");
Ok(())
}
Opening Control Stream
After establishing a connection, open a bidirectional control stream:use moqtail::transport::control_stream_handler::ControlStreamHandler;
// Open bidirectional stream for control messages
let (send_stream, recv_stream) = connection
.open_bi()
.await?
.await?;
// Create control stream handler
let mut control_handler = ControlStreamHandler::new(send_stream, recv_stream);
Client Setup Handshake
Sending Client Setup
Initiate the MOQT handshake by sending a ClientSetup message:use moqtail::model::control::client_setup::ClientSetup;
use moqtail::model::control::constant::DRAFT_14;
use moqtail::model::control::control_message::ControlMessage;
use moqtail::model::common::pair::KeyValuePair;
use bytes::Bytes;
// Create client setup message
let supported_versions = vec![DRAFT_14];
let setup_parameters = vec![
KeyValuePair::try_new_varint(0, 10).unwrap(),
KeyValuePair::try_new_bytes(1, Bytes::from_static(b"client-role")).unwrap(),
];
let client_setup = ClientSetup::new(supported_versions, setup_parameters);
let setup_msg = ControlMessage::ClientSetup(Box::new(client_setup));
// Send setup message
control_handler.send(&setup_msg).await?;
Receiving Server Setup
Wait for the server’s setup response:use moqtail::model::control::server_setup::ServerSetup;
use moqtail::model::error::TerminationCode;
// Receive server setup message
match control_handler.next_message().await {
Ok(ControlMessage::ServerSetup(server_setup)) => {
println!("Server selected version: {}", server_setup.selected_version);
if server_setup.selected_version != DRAFT_14 {
return Err("Unsupported version".into());
}
}
Ok(_) => return Err("Expected ServerSetup message".into()),
Err(TerminationCode::ProtocolViolation) => {
return Err("Protocol violation during setup".into());
}
Err(e) => return Err(format!("Setup failed: {:?}", e).into()),
}
Subscribing to Tracks
Creating a Subscribe Request
Subscribe to a specific track to receive media:use moqtail::model::control::subscribe::Subscribe;
use moqtail::model::control::constant::{FilterType, GroupOrder};
use moqtail::model::common::tuple::{Tuple, TupleField};
use moqtail::model::common::location::Location;
// Create subscribe message for latest content
let subscribe = Subscribe::new_latest_object(
request_id: 1,
track_namespace: Tuple::from_utf8_path("conference/room42"),
track_name: TupleField::from_utf8("video"),
subscriber_priority: 128,
group_order: GroupOrder::Ascending,
forward: true,
subscribe_parameters: vec![],
);
let subscribe_msg = ControlMessage::Subscribe(Box::new(subscribe));
control_handler.send(&subscribe_msg).await?;
Subscribe with Specific Range
Subscribe to a specific range of content:// Subscribe to specific range of groups
let start_location = Location {
group: 100,
object: 0,
};
let subscribe = Subscribe::new_absolute_range(
request_id: 2,
track_namespace: Tuple::from_utf8_path("conference/room42"),
track_name: TupleField::from_utf8("audio"),
subscriber_priority: 128,
group_order: GroupOrder::Ascending,
forward: true,
start_location,
end_group: 200,
subscribe_parameters: vec![],
);
let subscribe_msg = ControlMessage::Subscribe(Box::new(subscribe));
control_handler.send(&subscribe_msg).await?;
Handling Subscribe Response
Process the subscription acknowledgment:use moqtail::model::control::subscribe_ok::SubscribeOk;
use moqtail::model::control::subscribe_error::SubscribeError;
match control_handler.next_message().await? {
ControlMessage::SubscribeOk(sub_ok) => {
println!("Subscription successful!");
println!("Track alias: {}", sub_ok.track_alias);
println!("Expires in: {} seconds", sub_ok.expires);
println!("Group order: {:?}", sub_ok.group_order);
if sub_ok.content_exists {
if let Some(largest) = sub_ok.largest_location {
println!("Latest content at group {}, object {}",
largest.group, largest.object);
}
}
}
ControlMessage::SubscribeError(sub_err) => {
eprintln!("Subscription failed: {}", sub_err.reason_phrase);
return Err("Subscribe error".into());
}
_ => return Err("Unexpected message type".into()),
}
Receiving Media Data
Processing Control Messages
Continuously process control messages in a loop:use moqtail::model::control::track_status::TrackStatus;
use moqtail::model::control::subscribe_update::SubscribeUpdate;
tokio::spawn(async move {
loop {
match control_handler.next_message().await {
Ok(ControlMessage::TrackStatus(status)) => {
println!("Track status update received");
// Handle track status
}
Ok(ControlMessage::SubscribeUpdate(update)) => {
println!("Subscribe parameters updated");
// Handle subscription update
}
Ok(msg) => {
println!("Received control message: {:?}", msg.get_type());
}
Err(TerminationCode::NoError) => {
println!("Connection closed cleanly");
break;
}
Err(e) => {
eprintln!("Control stream error: {:?}", e);
break;
}
}
}
});
Reading Data Streams
Accept and process incoming data streams:use moqtail::transport::data_stream_handler::DataStreamHandler;
// Accept incoming unidirectional streams for data
loop {
match connection.accept_uni().await {
Ok(recv_stream) => {
tokio::spawn(async move {
let mut buffer = vec![0u8; 4096];
loop {
match recv_stream.read(&mut buffer).await {
Ok(Some(n)) => {
println!("Received {} bytes of media data", n);
// Process media data
}
Ok(None) => {
println!("Stream closed");
break;
}
Err(e) => {
eprintln!("Stream error: {:?}", e);
break;
}
}
}
});
}
Err(e) => {
eprintln!("Error accepting stream: {:?}", e);
break;
}
}
}
Complete Client Example
Here’s a complete working client example:use moqtail::model::common::pair::KeyValuePair;
use moqtail::model::common::tuple::{Tuple, TupleField};
use moqtail::model::control::client_setup::ClientSetup;
use moqtail::model::control::constant::{DRAFT_14, GroupOrder};
use moqtail::model::control::control_message::ControlMessage;
use moqtail::model::control::subscribe::Subscribe;
use moqtail::model::error::TerminationCode;
use moqtail::transport::control_stream_handler::ControlStreamHandler;
use wtransport::{ClientConfig, Endpoint};
use wtransport::endpoint::IntoConnectOptions;
use bytes::Bytes;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
// Connect to relay
let client_config = ClientConfig::builder()
.with_bind_default()
.with_server_certificate_hashes(vec![server_cert_hash])
.build();
let endpoint = Endpoint::client(client_config)?;
let connection = endpoint
.connect("https://relay.example.com:4433".into_options())
.await?;
// Setup control stream
let (send, recv) = connection.open_bi().await?.await?;
let mut control = ControlStreamHandler::new(send, recv);
// Send client setup
let client_setup = ClientSetup::new(
vec![DRAFT_14],
vec![KeyValuePair::try_new_bytes(1, Bytes::from_static(b"subscriber")).unwrap()],
);
control.send(&ControlMessage::ClientSetup(Box::new(client_setup))).await?;
// Receive server setup
match control.next_message().await? {
ControlMessage::ServerSetup(setup) => {
println!("Connected with version {}", setup.selected_version);
}
_ => return Err("Unexpected setup response".into()),
}
// Subscribe to track
let subscribe = Subscribe::new_latest_object(
1,
Tuple::from_utf8_path("conference/main"),
TupleField::from_utf8("video"),
128,
GroupOrder::Ascending,
true,
vec![],
);
control.send(&ControlMessage::Subscribe(Box::new(subscribe))).await?;
// Wait for subscribe OK
match control.next_message().await? {
ControlMessage::SubscribeOk(ok) => {
println!("Subscribed with track alias {}", ok.track_alias);
}
_ => return Err("Subscribe failed".into()),
}
// Process messages
loop {
match control.next_message().await {
Ok(msg) => println!("Received: {:?}", msg.get_type()),
Err(TerminationCode::NoError) => break,
Err(e) => {
eprintln!("Error: {:?}", e);
break;
}
}
}
Ok(())
}
Error Handling
Timeout Handling
Handle control message timeouts:match control_handler.next_message().await {
Err(TerminationCode::ControlMessageTimeout) => {
eprintln!("Control message timeout - partial message not completed");
// Reconnect or terminate
}
Err(TerminationCode::DataStreamTimeout) => {
eprintln!("Data stream timeout");
// Handle timeout
}
Err(e) => eprintln!("Error: {:?}", e),
Ok(msg) => { /* process message */ }
}
Connection Recovery
Implement reconnection logic:async fn connect_with_retry(
endpoint: &Endpoint,
url: &str,
max_retries: u32,
) -> Result<Connection, Box<dyn std::error::Error>> {
let mut retries = 0;
loop {
match endpoint.connect(url.into_options()).await {
Ok(conn) => return Ok(conn),
Err(e) if retries < max_retries => {
retries += 1;
eprintln!("Connection failed (attempt {}): {:?}", retries, e);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
Err(e) => return Err(e.into()),
}
}
}
Advanced Topics
Managing Multiple Subscriptions
Track multiple active subscriptions:use std::collections::HashMap;
struct SubscriptionManager {
active_subs: HashMap<u64, String>,
next_request_id: u64,
}
impl SubscriptionManager {
fn new() -> Self {
Self {
active_subs: HashMap::new(),
next_request_id: 1,
}
}
async fn subscribe(
&mut self,
control: &mut ControlStreamHandler,
namespace: &str,
name: &str,
) -> Result<u64, Box<dyn std::error::Error>> {
let request_id = self.next_request_id;
self.next_request_id += 1;
let subscribe = Subscribe::new_latest_object(
request_id,
Tuple::from_utf8_path(namespace),
TupleField::from_utf8(name),
128,
GroupOrder::Ascending,
true,
vec![],
);
control.send(&ControlMessage::Subscribe(Box::new(subscribe))).await?;
self.active_subs.insert(request_id, format!("{}/{}", namespace, name));
Ok(request_id)
}
}
Next Steps
Transport Layer
Deep dive into control and data stream handlers
Protocol Model
Explore MOQT message types and data structures
Relay Server
Build a relay server to forward streams