Documentation Index Fetch the complete documentation index at: https://mintlify.com/moqtail/moqtail/llms.txt
Use this file to discover all available pages before exploring further.
Transport
The transport module provides low-level handlers for MOQT control and data streams. It manages the sending and receiving of protocol messages over QUIC streams, including message framing, timeout handling, and stream lifecycle management.
Module Structure
pub mod control_stream_handler ; // Control message stream handling
pub mod data_stream_handler ; // Data stream handling
Control Stream Handler
The ControlStreamHandler manages bidirectional control streams for MOQT protocol messages.
Creating a Handler
Create a control stream handler from QUIC streams:
use moqtail :: transport :: control_stream_handler :: ControlStreamHandler ;
use wtransport :: { SendStream , RecvStream };
// From a QUIC bidirectional stream
let ( send_stream , recv_stream ) = connection . open_bi () . await ?. await ? ;
let mut control_handler = ControlStreamHandler :: new ( send_stream , recv_stream );
Sending Messages
Send control messages to the peer:
use moqtail :: model :: control :: control_message :: ControlMessage ;
use moqtail :: model :: control :: subscribe :: Subscribe ;
use moqtail :: model :: error :: TerminationCode ;
// Create a message
let subscribe = Subscribe :: new_latest_object (
1 ,
Tuple :: from_utf8_path ( "conference/room1" ),
TupleField :: from_utf8 ( "video" ),
128 ,
GroupOrder :: Ascending ,
true ,
vec! [],
);
let message = ControlMessage :: Subscribe ( Box :: new ( subscribe ));
// Send the message
match control_handler . send ( & message ) . await {
Ok (()) => println! ( "Message sent successfully" ),
Err ( TerminationCode :: InternalError ) => {
eprintln! ( "Failed to send message" );
}
Err ( e ) => eprintln! ( "Send error: {:?}" , e ),
}
Receiving Messages
Receive and parse incoming control messages:
use moqtail :: model :: control :: control_message :: ControlMessage ;
use moqtail :: model :: error :: TerminationCode ;
// Receive next message (blocking async)
match control_handler . next_message () . await {
Ok ( ControlMessage :: Subscribe ( subscribe )) => {
println! ( "Received subscribe for {}" , subscribe . track_name);
}
Ok ( ControlMessage :: SubscribeOk ( sub_ok )) => {
println! ( "Subscribe succeeded, track alias: {}" , sub_ok . track_alias);
}
Ok ( msg ) => {
println! ( "Received message type: {:?}" , msg . get_type ());
}
Err ( TerminationCode :: NoError ) => {
println! ( "Peer closed connection cleanly" );
}
Err ( TerminationCode :: ControlMessageTimeout ) => {
eprintln! ( "Timeout waiting for complete message" );
}
Err ( TerminationCode :: ProtocolViolation ) => {
eprintln! ( "Protocol violation detected" );
}
Err ( e ) => {
eprintln! ( "Error receiving message: {:?}" , e );
}
}
Message Processing Loop
Continuously process control messages:
use moqtail :: model :: error :: TerminationCode ;
async fn handle_control_messages (
mut control : ControlStreamHandler ,
) -> Result <(), Box < dyn std :: error :: Error >> {
loop {
match control . next_message () . await {
Ok ( ControlMessage :: Subscribe ( sub )) => {
// Handle subscribe
handle_subscribe ( & mut control , sub ) . await ? ;
}
Ok ( ControlMessage :: Publish ( pub_msg )) => {
// Handle publish
handle_publish ( & mut control , pub_msg ) . await ? ;
}
Ok ( ControlMessage :: Unsubscribe ( unsub )) => {
// Handle unsubscribe
handle_unsubscribe ( & mut control , unsub ) . await ? ;
}
Ok ( msg ) => {
println! ( "Received: {:?}" , msg . get_type ());
}
Err ( TerminationCode :: NoError ) => {
println! ( "Connection closed cleanly" );
break ;
}
Err ( e ) => {
eprintln! ( "Error: {:?}" , e );
break ;
}
}
}
Ok (())
}
Timeout Handling
The control stream handler implements automatic timeout handling for partial messages.
Timeout Configuration
The handler uses a 5-second timeout for incomplete messages:
// Internal timeout constant
const CONTROL_MESSAGE_TIMEOUT : Duration = Duration :: from_secs ( 5 );
Partial Message Handling
When a message is partially received:
A deadline is set (current time + 5 seconds)
The handler continues reading data
If the message completes, the deadline is cleared
If the deadline expires, ControlMessageTimeout error is returned
// The handler automatically manages partial messages
match control . next_message () . await {
Ok ( msg ) => {
// Complete message received
}
Err ( TerminationCode :: ControlMessageTimeout ) => {
// Partial message didn't complete within 5 seconds
eprintln! ( "Timeout waiting for complete message" );
// Connection should be terminated
}
Err ( e ) => eprintln! ( "Other error: {:?}" , e ),
}
Stream Priority
Control streams should be set to maximum priority:
use wtransport :: SendStream ;
// On the server side when accepting a control stream
let ( send_stream , recv_stream ) = connection . accept_bi () . await ? ;
// Set control stream to max priority
send_stream . set_priority ( i32 :: MAX );
let control = ControlStreamHandler :: new ( send_stream , recv_stream );
Error Handling
Connection Errors
Handle different types of connection errors:
use moqtail :: model :: error :: TerminationCode ;
use wtransport :: error :: StreamReadError ;
match control . next_message () . await {
// Clean disconnection
Err ( TerminationCode :: NoError ) => {
println! ( "Client disconnected normally" );
// Don't try to close the connection
}
// Protocol violation
Err ( TerminationCode :: ProtocolViolation ) => {
eprintln! ( "Protocol violation - malformed message" );
// Should terminate connection
}
// Timeout
Err ( TerminationCode :: ControlMessageTimeout ) => {
eprintln! ( "Message timeout - partial data not completed" );
// Terminate connection
}
// Internal error
Err ( TerminationCode :: InternalError ) => {
eprintln! ( "Internal error during message processing" );
// Terminate connection
}
Ok ( msg ) => { /* process message */ }
Err ( e ) => eprintln! ( "Other error: {:?}" , e ),
}
Data Stream Handler
The data_stream_handler module handles MOQT data streams for media delivery.
The data stream handler implementation is currently minimal. Data streams are typically managed at the application level using raw QUIC streams.
Accepting Data Streams
Accept incoming unidirectional data streams:
use wtransport :: Connection ;
async fn accept_data_streams ( connection : Connection ) {
loop {
match connection . accept_uni () . await {
Ok ( mut recv_stream ) => {
tokio :: spawn ( async move {
handle_data_stream ( recv_stream ) . await ;
});
}
Err ( e ) => {
eprintln! ( "Error accepting stream: {:?}" , e );
break ;
}
}
}
}
async fn handle_data_stream ( mut stream : RecvStream ) {
let mut buffer = vec! [ 0 u8 ; 4096 ];
loop {
match stream . read ( & mut buffer ) . await {
Ok ( Some ( n )) => {
println! ( "Received {} bytes" , n );
// Process data
}
Ok ( None ) => {
println! ( "Stream closed" );
break ;
}
Err ( e ) => {
eprintln! ( "Read error: {:?}" , e );
break ;
}
}
}
}
Opening Data Streams
Send data on unidirectional streams:
use wtransport :: Connection ;
use bytes :: Bytes ;
async fn send_data (
connection : & Connection ,
data : Bytes ,
) -> Result <(), Box < dyn std :: error :: Error >> {
// Open unidirectional stream
let mut send_stream = connection . open_uni () . await ?. await ? ;
// Write data
send_stream . write_all ( & data ) . await ? ;
// Close stream
send_stream . finish () . await ? ;
Ok (())
}
Internal Implementation Details
Buffer Management
The control stream handler uses efficient buffering:
// Internal structure (for reference)
struct ControlStreamHandler {
send : SendStream ,
recv : RecvStream ,
recv_bytes : BytesMut , // Accumulated received bytes
recv_buf : Box <[ u8 ; MTU_SIZE ]>, // Read buffer (1500 bytes)
partial_message_deadline : Option < Instant >,
}
Message Parsing
Messages are parsed incrementally:
Read bytes from stream into buffer
Attempt to deserialize a complete message
If successful, remove parsed bytes from buffer
If incomplete, wait for more data (with timeout)
Return parsed message or error
// Conceptual flow (internal to handler)
loop {
if ! recv_bytes . is_empty () {
match ControlMessage :: deserialize ( & mut recv_bytes . clone ()) {
Ok ( msg ) => return Ok ( msg ),
Err ( ParseError :: NotEnoughBytes { .. }) => {
// Set deadline if not set
// Continue reading
}
Err ( e ) => return Err ( e ),
}
}
// Read more data
read_more_data () . await ? ;
}
Complete Example
Here’s a complete example using the transport layer:
use moqtail :: model :: common :: pair :: KeyValuePair ;
use moqtail :: model :: common :: tuple :: { Tuple , TupleField };
use moqtail :: model :: control :: client_setup :: ClientSetup ;
use moqtail :: model :: control :: constant :: { DRAFT_14 , GroupOrder };
use moqtail :: model :: control :: control_message :: ControlMessage ;
use moqtail :: model :: control :: subscribe :: Subscribe ;
use moqtail :: model :: error :: TerminationCode ;
use moqtail :: transport :: control_stream_handler :: ControlStreamHandler ;
use wtransport :: { ClientConfig , Endpoint };
use wtransport :: endpoint :: IntoConnectOptions ;
use bytes :: Bytes ;
#[tokio :: main]
async fn main () -> Result <(), Box < dyn std :: error :: Error >> {
// Setup connection
let config = ClientConfig :: builder ()
. with_bind_default ()
. with_server_certificate_hashes ( vec! [ cert_hash ])
. build ();
let endpoint = Endpoint :: client ( config ) ? ;
let connection = endpoint
. connect ( "https://relay.example.com:4433" . into_options ())
. await ? ;
// Create control stream
let ( send , recv ) = connection . open_bi () . await ?. await ? ;
let mut control = ControlStreamHandler :: new ( send , recv );
// Send client setup
let setup = ClientSetup :: new (
vec! [ DRAFT_14 ],
vec! [ KeyValuePair :: try_new_bytes ( 1 , Bytes :: from_static ( b"client" )) . unwrap ()],
);
control . send ( & ControlMessage :: ClientSetup ( Box :: new ( setup ))) . await ? ;
// Receive server setup
match control . next_message () . await ? {
ControlMessage :: ServerSetup ( _ ) => println! ( "Setup complete" ),
_ => return Err ( "Unexpected message" . into ()),
}
// Subscribe to track
let sub = Subscribe :: new_latest_object (
1 ,
Tuple :: from_utf8_path ( "test/track" ),
TupleField :: from_utf8 ( "video" ),
128 ,
GroupOrder :: Ascending ,
true ,
vec! [],
);
control . send ( & ControlMessage :: Subscribe ( Box :: new ( sub ))) . await ? ;
// Process messages
loop {
match control . next_message () . await {
Ok ( msg ) => println! ( "Received: {:?}" , msg . get_type ()),
Err ( TerminationCode :: NoError ) => break ,
Err ( e ) => {
eprintln! ( "Error: {:?}" , e );
break ;
}
}
}
Ok (())
}
Buffer Sizes
The handler uses MTU-sized buffers (1500 bytes) for efficient network I/O:
const MTU_SIZE : usize = 1500 ;
Message Serialization
Messages have a maximum payload length of 65535 bytes (u16):
// Maximum message payload
let payload_len : u16 = payload . len () . try_into () ? ;
Zero-Copy Operations
The handler uses Bytes and BytesMut for zero-copy buffer operations where possible.
Testing
The transport layer includes comprehensive tests:
#[tokio :: test]
async fn test_successful_message () -> Result <(), Box < dyn Error >> {
let setup = TestSetup :: new () . await ? ;
let ( mut plane , mut server_send ) = setup . create_control_plane () . await ? ;
// Send message from server
let announce = Box :: new ( create_test_publish_namespace ());
let bytes = announce . serialize () . unwrap ();
server_send . write_all ( & bytes ) . await ? ;
// Receive on client
let received = plane . next_message () . await . unwrap ();
match received {
ControlMessage :: PublishNamespace ( rec ) => assert_eq! ( rec , announce ),
_ => panic! ( "Wrong message type" ),
}
Ok (())
}
Next Steps
Protocol Model Understand the message types being transported
Client Guide Build client applications using transport handlers
Relay Guide Implement relay servers with transport handlers