scuffle_rtmp/
lib.rs

1//! A crate for handling RTMP server connections.
2#![cfg_attr(feature = "docs", doc = "\n\nSee the [changelog][changelog] for a full release history.")]
3//! ## Specifications
4//!
5//! | Name | Version | Link | Comments |
6//! | --- | --- | --- | --- |
7//! | Adobe’s Real Time Messaging Protocol | `1.0` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/legacy/rtmp-v1-0-spec.pdf> | Refered to as 'Legacy RTMP spec' in this documentation |
8//! | Enhancing RTMP, FLV | `v1-2024-02-29-r1` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/enhanced/enhanced-rtmp-v1.pdf> | |
9//! | Enhanced RTMP | `v2-2024-10-22-b1` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/enhanced/enhanced-rtmp-v2.pdf> | Refered to as 'Enhanced RTMP spec' in this documentation |
10#![cfg_attr(feature = "docs", doc = "## Feature flags")]
11#![cfg_attr(feature = "docs", doc = document_features::document_features!())]
12//! ## Example
13//!
14//! ```no_run
15//! # use std::io::Cursor;
16//! #
17//! # use scuffle_rtmp::ServerSession;
18//! # use scuffle_rtmp::session::server::{ServerSessionError, SessionData, SessionHandler};
19//! # use tokio::net::TcpListener;
20//! #
21//! struct Handler;
22//!
23//! impl SessionHandler for Handler {
24//!     async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
25//!         // Handle incoming video/audio/meta data
26//!         Ok(())
27//!     }
28//!
29//!     async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
30//!         // Handle the publish event
31//!         Ok(())
32//!     }
33//!
34//!     async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
35//!         // Handle the unpublish event
36//!         Ok(())
37//!     }
38//! }
39//!
40//! #[tokio::main]
41//! async fn main() {
42//!     let listener = TcpListener::bind("[::]:1935").await.unwrap();
43//!     // listening on [::]:1935
44//!
45//!     while let Ok((stream, addr)) = listener.accept().await {
46//!         let session = ServerSession::new(stream, Handler);
47//!
48//!         tokio::spawn(async move {
49//!             if let Err(err) = session.run().await {
50//!                 // Handle the session error
51//!             }
52//!         });
53//!     }
54//! }
55//! ```
56//!
57//! ## License
58//!
59//! This project is licensed under the MIT or Apache-2.0 license.
60//! You can choose between one of them if you use this work.
61//!
62//! `SPDX-License-Identifier: MIT OR Apache-2.0`
63#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
64#![cfg_attr(docsrs, feature(doc_auto_cfg))]
65#![deny(missing_docs)]
66#![deny(unsafe_code)]
67#![deny(unreachable_pub)]
68#![deny(clippy::mod_module_files)]
69
70pub mod chunk;
71pub mod command_messages;
72pub mod error;
73pub mod handshake;
74pub mod messages;
75pub mod protocol_control_messages;
76pub mod session;
77pub mod user_control_messages;
78
79pub use session::server::ServerSession;
80
81/// Changelogs generated by [scuffle_changelog]
82#[cfg(feature = "docs")]
83#[scuffle_changelog::changelog]
84pub mod changelog {}
85
86#[cfg(test)]
87#[cfg_attr(all(test, coverage_nightly), coverage(off))]
88mod tests {
89    use std::path::PathBuf;
90    use std::time::Duration;
91
92    use scuffle_future_ext::FutureExt;
93    use tokio::process::Command;
94    use tokio::sync::{mpsc, oneshot};
95
96    use crate::session::server::{ServerSession, ServerSessionError, SessionData, SessionHandler};
97
98    fn file_path(item: &str) -> PathBuf {
99        if let Some(env) = std::env::var_os("ASSETS_DIR") {
100            PathBuf::from(env).join(item)
101        } else {
102            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(format!("../../assets/{item}"))
103        }
104    }
105
106    enum Event {
107        Publish {
108            stream_id: u32,
109            app_name: String,
110            stream_name: String,
111            response: oneshot::Sender<Result<(), ServerSessionError>>,
112        },
113        Unpublish {
114            stream_id: u32,
115            response: oneshot::Sender<Result<(), ServerSessionError>>,
116        },
117        Data {
118            stream_id: u32,
119            data: SessionData,
120            response: oneshot::Sender<Result<(), ServerSessionError>>,
121        },
122    }
123
124    struct Handler(mpsc::Sender<Event>);
125
126    impl SessionHandler for Handler {
127        async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
128            let (response, reciever) = oneshot::channel();
129
130            self.0
131                .send(Event::Publish {
132                    stream_id,
133                    app_name: app_name.to_string(),
134                    stream_name: stream_name.to_string(),
135                    response,
136                })
137                .await
138                .unwrap();
139
140            reciever.await.unwrap()
141        }
142
143        async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
144            let (response, reciever) = oneshot::channel();
145
146            self.0.send(Event::Unpublish { stream_id, response }).await.unwrap();
147
148            reciever.await.unwrap()
149        }
150
151        async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
152            let (response, reciever) = oneshot::channel();
153            self.0
154                .send(Event::Data {
155                    stream_id,
156                    data,
157                    response,
158                })
159                .await
160                .unwrap();
161
162            reciever.await.unwrap()
163        }
164    }
165
166    #[cfg(not(valgrind))] // test is time-sensitive, consider refactoring?
167    #[tokio::test]
168    async fn test_basic_rtmp_clean() {
169        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
170        let addr = listener.local_addr().unwrap();
171
172        let _ffmpeg = Command::new(std::env::var_os("FFMPEG").unwrap_or("ffmpeg".into()))
173            .args([
174                "-loglevel",
175                "debug",
176                "-re",
177                "-i",
178                file_path("avc_aac.mp4").to_str().expect("failed to get path"),
179                "-r",
180                "30",
181                "-t",
182                "1", // just for the test so it doesn't take too long
183                "-c",
184                "copy",
185                "-f",
186                "flv",
187                &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
188            ])
189            .stdout(std::process::Stdio::inherit())
190            .stderr(std::process::Stdio::inherit())
191            .spawn()
192            .expect("failed to execute ffmpeg");
193
194        let (ffmpeg_stream, _) = listener
195            .accept()
196            .with_timeout(Duration::from_millis(1000))
197            .await
198            .expect("timed out")
199            .expect("failed to accept");
200
201        let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
202            let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
203            let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
204
205            (
206                tokio::spawn(async move {
207                    let r = session.run().await;
208                    println!("ffmpeg session ended: {r:?}");
209                    r
210                }),
211                ffmpeg_event_reciever,
212            )
213        };
214
215        let event = ffmpeg_event_reciever
216            .recv()
217            .with_timeout(Duration::from_millis(1000))
218            .await
219            .expect("timed out")
220            .expect("failed to recv event");
221
222        match event {
223            Event::Publish {
224                stream_id,
225                app_name,
226                stream_name,
227                response,
228            } => {
229                assert_eq!(stream_id, 1);
230                assert_eq!(app_name, "live");
231                assert_eq!(stream_name, "stream-key");
232                response.send(Ok(())).expect("failed to send response");
233            }
234            _ => panic!("unexpected event"),
235        }
236
237        let mut got_video = false;
238        let mut got_audio = false;
239        let mut got_metadata = false;
240
241        while let Some(data) = ffmpeg_event_reciever
242            .recv()
243            .with_timeout(Duration::from_millis(1000))
244            .await
245            .expect("timed out")
246        {
247            match data {
248                Event::Data {
249                    stream_id,
250                    response,
251                    data,
252                    ..
253                } => {
254                    match data {
255                        SessionData::Video { .. } => got_video = true,
256                        SessionData::Audio { .. } => got_audio = true,
257                        SessionData::Amf0 { .. } => got_metadata = true,
258                    }
259                    response.send(Ok(())).expect("failed to send response");
260                    assert_eq!(stream_id, 1);
261                }
262                Event::Unpublish { stream_id, response } => {
263                    assert_eq!(stream_id, 1);
264                    response.send(Ok(())).expect("failed to send response");
265                    break;
266                }
267                _ => panic!("unexpected event"),
268            }
269        }
270
271        assert!(got_video);
272        assert!(got_audio);
273        assert!(got_metadata);
274
275        if ffmpeg_event_reciever
276            .recv()
277            .with_timeout(Duration::from_millis(1000))
278            .await
279            .expect("timed out")
280            .is_some()
281        {
282            panic!("unexpected event");
283        }
284
285        assert!(
286            ffmpeg_handle
287                .await
288                .expect("failed to join handle")
289                .expect("failed to handle ffmpeg connection")
290        );
291
292        // TODO: Fix this assertion
293        // assert!(ffmpeg.try_wait().expect("failed to wait for ffmpeg").is_none());
294    }
295
296    // test is time-sensitive, consider refactoring?
297    // windows seems to not let us kill ffmpeg without it cleaning up the stream.
298    #[cfg(all(not(valgrind), not(windows)))]
299    #[tokio::test]
300    async fn test_basic_rtmp_unclean() {
301        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
302        let addr = listener.local_addr().unwrap();
303
304        println!("ffmpeg from {}", std::env::var("FFMPEG").unwrap_or("ffmpeg".into()));
305
306        let mut ffmpeg = Command::new(std::env::var_os("FFMPEG").unwrap_or("ffmpeg".into()))
307            .args([
308                "-loglevel",
309                "debug",
310                "-re",
311                "-i",
312                file_path("avc_aac.mp4").to_str().expect("failed to get path"),
313                "-r",
314                "30",
315                "-t",
316                "1", // just for the test so it doesn't take too long
317                "-c",
318                "copy",
319                "-f",
320                "flv",
321                &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
322            ])
323            .stdout(std::process::Stdio::inherit())
324            .stderr(std::process::Stdio::inherit())
325            .spawn()
326            .expect("failed to execute ffmpeg");
327
328        let (ffmpeg_stream, _) = listener
329            .accept()
330            .with_timeout(Duration::from_millis(1000))
331            .await
332            .expect("timed out")
333            .expect("failed to accept");
334
335        let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
336            let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
337            let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
338
339            (
340                tokio::spawn(async move {
341                    let r = session.run().await;
342                    println!("ffmpeg session ended: {r:?}");
343                    r
344                }),
345                ffmpeg_event_reciever,
346            )
347        };
348
349        let event = ffmpeg_event_reciever
350            .recv()
351            .with_timeout(Duration::from_millis(1000))
352            .await
353            .expect("timed out")
354            .expect("failed to recv event");
355
356        match event {
357            Event::Publish {
358                stream_id,
359                app_name,
360                stream_name,
361                response,
362            } => {
363                assert_eq!(stream_id, 1);
364                assert_eq!(app_name, "live");
365                assert_eq!(stream_name, "stream-key");
366                response.send(Ok(())).expect("failed to send response");
367            }
368            _ => panic!("unexpected event"),
369        }
370
371        let mut got_video = false;
372        let mut got_audio = false;
373        let mut got_metadata = false;
374
375        while let Some(data) = ffmpeg_event_reciever
376            .recv()
377            .with_timeout(Duration::from_millis(1000))
378            .await
379            .expect("timed out")
380        {
381            match data {
382                Event::Data {
383                    stream_id,
384                    response,
385                    data,
386                    ..
387                } => {
388                    assert_eq!(stream_id, 1);
389                    match data {
390                        SessionData::Video { .. } => got_video = true,
391                        SessionData::Audio { .. } => got_audio = true,
392                        SessionData::Amf0 { .. } => got_metadata = true,
393                    }
394                    response.send(Ok(())).expect("failed to send response");
395                }
396                _ => panic!("unexpected event"),
397            }
398
399            if got_video && got_audio && got_metadata {
400                break;
401            }
402        }
403
404        assert!(got_video);
405        assert!(got_audio);
406        assert!(got_metadata);
407
408        ffmpeg.kill().await.expect("failed to kill ffmpeg");
409
410        while let Some(data) = ffmpeg_event_reciever
411            .recv()
412            .with_timeout(Duration::from_millis(1000))
413            .await
414            .expect("timed out")
415        {
416            match data {
417                Event::Data { response, .. } => {
418                    response.send(Ok(())).expect("failed to send response");
419                }
420                _ => panic!("unexpected event"),
421            }
422        }
423
424        // the server should have detected the ffmpeg process has died uncleanly
425        assert!(
426            !ffmpeg_handle
427                .await
428                .expect("failed to join handle")
429                .expect("failed to handle ffmpeg connection")
430        );
431    }
432}