Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/moqtail/moqtail/llms.txt

Use this file to discover all available pages before exploring further.

Transport

The transport module provides low-level handlers for MOQT control and data streams. It manages the sending and receiving of protocol messages over QUIC streams, including message framing, timeout handling, and stream lifecycle management.

Module Structure

pub mod control_stream_handler;  // Control message stream handling
pub mod data_stream_handler;     // Data stream handling

Control Stream Handler

The ControlStreamHandler manages bidirectional control streams for MOQT protocol messages.

Creating a Handler

Create a control stream handler from QUIC streams:
use moqtail::transport::control_stream_handler::ControlStreamHandler;
use wtransport::{SendStream, RecvStream};

// From a QUIC bidirectional stream
let (send_stream, recv_stream) = connection.open_bi().await?.await?;

let mut control_handler = ControlStreamHandler::new(send_stream, recv_stream);

Sending Messages

Send control messages to the peer:
use moqtail::model::control::control_message::ControlMessage;
use moqtail::model::control::subscribe::Subscribe;
use moqtail::model::error::TerminationCode;

// Create a message
let subscribe = Subscribe::new_latest_object(
    1,
    Tuple::from_utf8_path("conference/room1"),
    TupleField::from_utf8("video"),
    128,
    GroupOrder::Ascending,
    true,
    vec![],
);

let message = ControlMessage::Subscribe(Box::new(subscribe));

// Send the message
match control_handler.send(&message).await {
    Ok(()) => println!("Message sent successfully"),
    Err(TerminationCode::InternalError) => {
        eprintln!("Failed to send message");
    }
    Err(e) => eprintln!("Send error: {:?}", e),
}

Receiving Messages

Receive and parse incoming control messages:
use moqtail::model::control::control_message::ControlMessage;
use moqtail::model::error::TerminationCode;

// Receive next message (blocking async)
match control_handler.next_message().await {
    Ok(ControlMessage::Subscribe(subscribe)) => {
        println!("Received subscribe for {}", subscribe.track_name);
    }
    Ok(ControlMessage::SubscribeOk(sub_ok)) => {
        println!("Subscribe succeeded, track alias: {}", sub_ok.track_alias);
    }
    Ok(msg) => {
        println!("Received message type: {:?}", msg.get_type());
    }
    Err(TerminationCode::NoError) => {
        println!("Peer closed connection cleanly");
    }
    Err(TerminationCode::ControlMessageTimeout) => {
        eprintln!("Timeout waiting for complete message");
    }
    Err(TerminationCode::ProtocolViolation) => {
        eprintln!("Protocol violation detected");
    }
    Err(e) => {
        eprintln!("Error receiving message: {:?}", e);
    }
}

Message Processing Loop

Continuously process control messages:
use moqtail::model::error::TerminationCode;

async fn handle_control_messages(
    mut control: ControlStreamHandler,
) -> Result<(), Box<dyn std::error::Error>> {
    loop {
        match control.next_message().await {
            Ok(ControlMessage::Subscribe(sub)) => {
                // Handle subscribe
                handle_subscribe(&mut control, sub).await?;
            }
            Ok(ControlMessage::Publish(pub_msg)) => {
                // Handle publish
                handle_publish(&mut control, pub_msg).await?;
            }
            Ok(ControlMessage::Unsubscribe(unsub)) => {
                // Handle unsubscribe
                handle_unsubscribe(&mut control, unsub).await?;
            }
            Ok(msg) => {
                println!("Received: {:?}", msg.get_type());
            }
            Err(TerminationCode::NoError) => {
                println!("Connection closed cleanly");
                break;
            }
            Err(e) => {
                eprintln!("Error: {:?}", e);
                break;
            }
        }
    }
    
    Ok(())
}

Timeout Handling

The control stream handler implements automatic timeout handling for partial messages.

Timeout Configuration

The handler uses a 5-second timeout for incomplete messages:
// Internal timeout constant
const CONTROL_MESSAGE_TIMEOUT: Duration = Duration::from_secs(5);

Partial Message Handling

When a message is partially received:
  1. A deadline is set (current time + 5 seconds)
  2. The handler continues reading data
  3. If the message completes, the deadline is cleared
  4. If the deadline expires, ControlMessageTimeout error is returned
// The handler automatically manages partial messages
match control.next_message().await {
    Ok(msg) => {
        // Complete message received
    }
    Err(TerminationCode::ControlMessageTimeout) => {
        // Partial message didn't complete within 5 seconds
        eprintln!("Timeout waiting for complete message");
        // Connection should be terminated
    }
    Err(e) => eprintln!("Other error: {:?}", e),
}

Stream Priority

Control streams should be set to maximum priority:
use wtransport::SendStream;

// On the server side when accepting a control stream
let (send_stream, recv_stream) = connection.accept_bi().await?;

// Set control stream to max priority
send_stream.set_priority(i32::MAX);

let control = ControlStreamHandler::new(send_stream, recv_stream);

Error Handling

Connection Errors

Handle different types of connection errors:
use moqtail::model::error::TerminationCode;
use wtransport::error::StreamReadError;

match control.next_message().await {
    // Clean disconnection
    Err(TerminationCode::NoError) => {
        println!("Client disconnected normally");
        // Don't try to close the connection
    }
    
    // Protocol violation
    Err(TerminationCode::ProtocolViolation) => {
        eprintln!("Protocol violation - malformed message");
        // Should terminate connection
    }
    
    // Timeout
    Err(TerminationCode::ControlMessageTimeout) => {
        eprintln!("Message timeout - partial data not completed");
        // Terminate connection
    }
    
    // Internal error
    Err(TerminationCode::InternalError) => {
        eprintln!("Internal error during message processing");
        // Terminate connection
    }
    
    Ok(msg) => { /* process message */ }
    Err(e) => eprintln!("Other error: {:?}", e),
}

Data Stream Handler

The data_stream_handler module handles MOQT data streams for media delivery.
The data stream handler implementation is currently minimal. Data streams are typically managed at the application level using raw QUIC streams.

Accepting Data Streams

Accept incoming unidirectional data streams:
use wtransport::Connection;

async fn accept_data_streams(connection: Connection) {
    loop {
        match connection.accept_uni().await {
            Ok(mut recv_stream) => {
                tokio::spawn(async move {
                    handle_data_stream(recv_stream).await;
                });
            }
            Err(e) => {
                eprintln!("Error accepting stream: {:?}", e);
                break;
            }
        }
    }
}

async fn handle_data_stream(mut stream: RecvStream) {
    let mut buffer = vec![0u8; 4096];
    
    loop {
        match stream.read(&mut buffer).await {
            Ok(Some(n)) => {
                println!("Received {} bytes", n);
                // Process data
            }
            Ok(None) => {
                println!("Stream closed");
                break;
            }
            Err(e) => {
                eprintln!("Read error: {:?}", e);
                break;
            }
        }
    }
}

Opening Data Streams

Send data on unidirectional streams:
use wtransport::Connection;
use bytes::Bytes;

async fn send_data(
    connection: &Connection,
    data: Bytes,
) -> Result<(), Box<dyn std::error::Error>> {
    // Open unidirectional stream
    let mut send_stream = connection.open_uni().await?.await?;
    
    // Write data
    send_stream.write_all(&data).await?;
    
    // Close stream
    send_stream.finish().await?;
    
    Ok(())
}

Internal Implementation Details

Buffer Management

The control stream handler uses efficient buffering:
// Internal structure (for reference)
struct ControlStreamHandler {
    send: SendStream,
    recv: RecvStream,
    recv_bytes: BytesMut,           // Accumulated received bytes
    recv_buf: Box<[u8; MTU_SIZE]>,  // Read buffer (1500 bytes)
    partial_message_deadline: Option<Instant>,
}

Message Parsing

Messages are parsed incrementally:
  1. Read bytes from stream into buffer
  2. Attempt to deserialize a complete message
  3. If successful, remove parsed bytes from buffer
  4. If incomplete, wait for more data (with timeout)
  5. Return parsed message or error
// Conceptual flow (internal to handler)
loop {
    if !recv_bytes.is_empty() {
        match ControlMessage::deserialize(&mut recv_bytes.clone()) {
            Ok(msg) => return Ok(msg),
            Err(ParseError::NotEnoughBytes { .. }) => {
                // Set deadline if not set
                // Continue reading
            }
            Err(e) => return Err(e),
        }
    }
    
    // Read more data
    read_more_data().await?;
}

Complete Example

Here’s a complete example using the transport layer:
use moqtail::model::common::pair::KeyValuePair;
use moqtail::model::common::tuple::{Tuple, TupleField};
use moqtail::model::control::client_setup::ClientSetup;
use moqtail::model::control::constant::{DRAFT_14, GroupOrder};
use moqtail::model::control::control_message::ControlMessage;
use moqtail::model::control::subscribe::Subscribe;
use moqtail::model::error::TerminationCode;
use moqtail::transport::control_stream_handler::ControlStreamHandler;
use wtransport::{ClientConfig, Endpoint};
use wtransport::endpoint::IntoConnectOptions;
use bytes::Bytes;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Setup connection
    let config = ClientConfig::builder()
        .with_bind_default()
        .with_server_certificate_hashes(vec![cert_hash])
        .build();
    
    let endpoint = Endpoint::client(config)?;
    let connection = endpoint
        .connect("https://relay.example.com:4433".into_options())
        .await?;
    
    // Create control stream
    let (send, recv) = connection.open_bi().await?.await?;
    let mut control = ControlStreamHandler::new(send, recv);
    
    // Send client setup
    let setup = ClientSetup::new(
        vec![DRAFT_14],
        vec![KeyValuePair::try_new_bytes(1, Bytes::from_static(b"client")).unwrap()],
    );
    control.send(&ControlMessage::ClientSetup(Box::new(setup))).await?;
    
    // Receive server setup
    match control.next_message().await? {
        ControlMessage::ServerSetup(_) => println!("Setup complete"),
        _ => return Err("Unexpected message".into()),
    }
    
    // Subscribe to track
    let sub = Subscribe::new_latest_object(
        1,
        Tuple::from_utf8_path("test/track"),
        TupleField::from_utf8("video"),
        128,
        GroupOrder::Ascending,
        true,
        vec![],
    );
    control.send(&ControlMessage::Subscribe(Box::new(sub))).await?;
    
    // Process messages
    loop {
        match control.next_message().await {
            Ok(msg) => println!("Received: {:?}", msg.get_type()),
            Err(TerminationCode::NoError) => break,
            Err(e) => {
                eprintln!("Error: {:?}", e);
                break;
            }
        }
    }
    
    Ok(())
}

Performance Considerations

Buffer Sizes

The handler uses MTU-sized buffers (1500 bytes) for efficient network I/O:
const MTU_SIZE: usize = 1500;

Message Serialization

Messages have a maximum payload length of 65535 bytes (u16):
// Maximum message payload
let payload_len: u16 = payload.len().try_into()?;

Zero-Copy Operations

The handler uses Bytes and BytesMut for zero-copy buffer operations where possible.

Testing

The transport layer includes comprehensive tests:
#[tokio::test]
async fn test_successful_message() -> Result<(), Box<dyn Error>> {
    let setup = TestSetup::new().await?;
    let (mut plane, mut server_send) = setup.create_control_plane().await?;
    
    // Send message from server
    let announce = Box::new(create_test_publish_namespace());
    let bytes = announce.serialize().unwrap();
    server_send.write_all(&bytes).await?;
    
    // Receive on client
    let received = plane.next_message().await.unwrap();
    
    match received {
        ControlMessage::PublishNamespace(rec) => assert_eq!(rec, announce),
        _ => panic!("Wrong message type"),
    }
    
    Ok(())
}

Next Steps

Protocol Model

Understand the message types being transported

Client Guide

Build client applications using transport handlers

Relay Guide

Implement relay servers with transport handlers