1#![cfg_attr(feature = "docs", doc = "\n\nSee the [changelog][changelog] for a full release history.")]
3#![cfg_attr(feature = "docs", doc = "## Feature flags")]
11#![cfg_attr(feature = "docs", doc = document_features::document_features!())]
12#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
64#![cfg_attr(docsrs, feature(doc_auto_cfg))]
65#![deny(missing_docs)]
66#![deny(unsafe_code)]
67#![deny(unreachable_pub)]
68#![deny(clippy::mod_module_files)]
69
70pub mod chunk;
71pub mod command_messages;
72pub mod error;
73pub mod handshake;
74pub mod messages;
75pub mod protocol_control_messages;
76pub mod session;
77pub mod user_control_messages;
78
79pub use session::server::ServerSession;
80
81#[cfg(feature = "docs")]
83#[scuffle_changelog::changelog]
84pub mod changelog {}
85
86#[cfg(test)]
87#[cfg_attr(all(test, coverage_nightly), coverage(off))]
88mod tests {
89 use std::path::PathBuf;
90 use std::time::Duration;
91
92 use scuffle_future_ext::FutureExt;
93 use tokio::process::Command;
94 use tokio::sync::{mpsc, oneshot};
95
96 use crate::session::server::{ServerSession, ServerSessionError, SessionData, SessionHandler};
97
98 fn file_path(item: &str) -> PathBuf {
99 if let Some(env) = std::env::var_os("ASSETS_DIR") {
100 PathBuf::from(env).join(item)
101 } else {
102 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(format!("../../assets/{item}"))
103 }
104 }
105
106 enum Event {
107 Publish {
108 stream_id: u32,
109 app_name: String,
110 stream_name: String,
111 response: oneshot::Sender<Result<(), ServerSessionError>>,
112 },
113 Unpublish {
114 stream_id: u32,
115 response: oneshot::Sender<Result<(), ServerSessionError>>,
116 },
117 Data {
118 stream_id: u32,
119 data: SessionData,
120 response: oneshot::Sender<Result<(), ServerSessionError>>,
121 },
122 }
123
124 struct Handler(mpsc::Sender<Event>);
125
126 impl SessionHandler for Handler {
127 async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
128 let (response, reciever) = oneshot::channel();
129
130 self.0
131 .send(Event::Publish {
132 stream_id,
133 app_name: app_name.to_string(),
134 stream_name: stream_name.to_string(),
135 response,
136 })
137 .await
138 .unwrap();
139
140 reciever.await.unwrap()
141 }
142
143 async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
144 let (response, reciever) = oneshot::channel();
145
146 self.0.send(Event::Unpublish { stream_id, response }).await.unwrap();
147
148 reciever.await.unwrap()
149 }
150
151 async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
152 let (response, reciever) = oneshot::channel();
153 self.0
154 .send(Event::Data {
155 stream_id,
156 data,
157 response,
158 })
159 .await
160 .unwrap();
161
162 reciever.await.unwrap()
163 }
164 }
165
166 #[cfg(not(valgrind))] #[tokio::test]
168 async fn test_basic_rtmp_clean() {
169 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
170 let addr = listener.local_addr().unwrap();
171
172 let _ffmpeg = Command::new(std::env::var_os("FFMPEG").unwrap_or("ffmpeg".into()))
173 .args([
174 "-loglevel",
175 "debug",
176 "-re",
177 "-i",
178 file_path("avc_aac.mp4").to_str().expect("failed to get path"),
179 "-r",
180 "30",
181 "-t",
182 "1", "-c",
184 "copy",
185 "-f",
186 "flv",
187 &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
188 ])
189 .stdout(std::process::Stdio::inherit())
190 .stderr(std::process::Stdio::inherit())
191 .spawn()
192 .expect("failed to execute ffmpeg");
193
194 let (ffmpeg_stream, _) = listener
195 .accept()
196 .with_timeout(Duration::from_millis(1000))
197 .await
198 .expect("timed out")
199 .expect("failed to accept");
200
201 let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
202 let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
203 let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
204
205 (
206 tokio::spawn(async move {
207 let r = session.run().await;
208 println!("ffmpeg session ended: {r:?}");
209 r
210 }),
211 ffmpeg_event_reciever,
212 )
213 };
214
215 let event = ffmpeg_event_reciever
216 .recv()
217 .with_timeout(Duration::from_millis(1000))
218 .await
219 .expect("timed out")
220 .expect("failed to recv event");
221
222 match event {
223 Event::Publish {
224 stream_id,
225 app_name,
226 stream_name,
227 response,
228 } => {
229 assert_eq!(stream_id, 1);
230 assert_eq!(app_name, "live");
231 assert_eq!(stream_name, "stream-key");
232 response.send(Ok(())).expect("failed to send response");
233 }
234 _ => panic!("unexpected event"),
235 }
236
237 let mut got_video = false;
238 let mut got_audio = false;
239 let mut got_metadata = false;
240
241 while let Some(data) = ffmpeg_event_reciever
242 .recv()
243 .with_timeout(Duration::from_millis(1000))
244 .await
245 .expect("timed out")
246 {
247 match data {
248 Event::Data {
249 stream_id,
250 response,
251 data,
252 ..
253 } => {
254 match data {
255 SessionData::Video { .. } => got_video = true,
256 SessionData::Audio { .. } => got_audio = true,
257 SessionData::Amf0 { .. } => got_metadata = true,
258 }
259 response.send(Ok(())).expect("failed to send response");
260 assert_eq!(stream_id, 1);
261 }
262 Event::Unpublish { stream_id, response } => {
263 assert_eq!(stream_id, 1);
264 response.send(Ok(())).expect("failed to send response");
265 break;
266 }
267 _ => panic!("unexpected event"),
268 }
269 }
270
271 assert!(got_video);
272 assert!(got_audio);
273 assert!(got_metadata);
274
275 if ffmpeg_event_reciever
276 .recv()
277 .with_timeout(Duration::from_millis(1000))
278 .await
279 .expect("timed out")
280 .is_some()
281 {
282 panic!("unexpected event");
283 }
284
285 assert!(
286 ffmpeg_handle
287 .await
288 .expect("failed to join handle")
289 .expect("failed to handle ffmpeg connection")
290 );
291
292 }
295
296 #[cfg(all(not(valgrind), not(windows)))]
299 #[tokio::test]
300 async fn test_basic_rtmp_unclean() {
301 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
302 let addr = listener.local_addr().unwrap();
303
304 println!("ffmpeg from {}", std::env::var("FFMPEG").unwrap_or("ffmpeg".into()));
305
306 let mut ffmpeg = Command::new(std::env::var_os("FFMPEG").unwrap_or("ffmpeg".into()))
307 .args([
308 "-loglevel",
309 "debug",
310 "-re",
311 "-i",
312 file_path("avc_aac.mp4").to_str().expect("failed to get path"),
313 "-r",
314 "30",
315 "-t",
316 "1", "-c",
318 "copy",
319 "-f",
320 "flv",
321 &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
322 ])
323 .stdout(std::process::Stdio::inherit())
324 .stderr(std::process::Stdio::inherit())
325 .spawn()
326 .expect("failed to execute ffmpeg");
327
328 let (ffmpeg_stream, _) = listener
329 .accept()
330 .with_timeout(Duration::from_millis(1000))
331 .await
332 .expect("timed out")
333 .expect("failed to accept");
334
335 let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
336 let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
337 let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
338
339 (
340 tokio::spawn(async move {
341 let r = session.run().await;
342 println!("ffmpeg session ended: {r:?}");
343 r
344 }),
345 ffmpeg_event_reciever,
346 )
347 };
348
349 let event = ffmpeg_event_reciever
350 .recv()
351 .with_timeout(Duration::from_millis(1000))
352 .await
353 .expect("timed out")
354 .expect("failed to recv event");
355
356 match event {
357 Event::Publish {
358 stream_id,
359 app_name,
360 stream_name,
361 response,
362 } => {
363 assert_eq!(stream_id, 1);
364 assert_eq!(app_name, "live");
365 assert_eq!(stream_name, "stream-key");
366 response.send(Ok(())).expect("failed to send response");
367 }
368 _ => panic!("unexpected event"),
369 }
370
371 let mut got_video = false;
372 let mut got_audio = false;
373 let mut got_metadata = false;
374
375 while let Some(data) = ffmpeg_event_reciever
376 .recv()
377 .with_timeout(Duration::from_millis(1000))
378 .await
379 .expect("timed out")
380 {
381 match data {
382 Event::Data {
383 stream_id,
384 response,
385 data,
386 ..
387 } => {
388 assert_eq!(stream_id, 1);
389 match data {
390 SessionData::Video { .. } => got_video = true,
391 SessionData::Audio { .. } => got_audio = true,
392 SessionData::Amf0 { .. } => got_metadata = true,
393 }
394 response.send(Ok(())).expect("failed to send response");
395 }
396 _ => panic!("unexpected event"),
397 }
398
399 if got_video && got_audio && got_metadata {
400 break;
401 }
402 }
403
404 assert!(got_video);
405 assert!(got_audio);
406 assert!(got_metadata);
407
408 ffmpeg.kill().await.expect("failed to kill ffmpeg");
409
410 while let Some(data) = ffmpeg_event_reciever
411 .recv()
412 .with_timeout(Duration::from_millis(1000))
413 .await
414 .expect("timed out")
415 {
416 match data {
417 Event::Data { response, .. } => {
418 response.send(Ok(())).expect("failed to send response");
419 }
420 _ => panic!("unexpected event"),
421 }
422 }
423
424 assert!(
426 !ffmpeg_handle
427 .await
428 .expect("failed to join handle")
429 .expect("failed to handle ffmpeg connection")
430 );
431 }
432}