Mustafa Can Yücel
blog-post-24

Streaming RTSP on an HTML Page Part 3

Introduction

In the previous blog posts, we have seen how to stream RTSP on an HTML page both 24/7 and on-demand. In this blog post, we will see how to stream RTSP on a SvelteKit web app using on-demand approach. To achieve modularity, we will create a Camera component that can be put anywhere on any page.

We will need to modify the Go server code and the Caddyfile slightly to manage the CORS policy; for the api endpoints initiate and heartbeat, we will create a middleware in the Go server app to handle the CORS policy. For the RTSP stream, we will use the same approach as in the previous blog post; the CORS policy will be handled by the Caddy server.

Creating a Svelte/SvelteKit App

The creation of the application itself is not the focus of this blog post, so I will not go into detail about it. You can follow the documentation on the SvelteKit website to create a new project. Once you have a working app with basic scaffolding, you can continue with the next steps.

Updating the Server App

As we will be calling the server api endpoints from the SvelteKit app, and our app domain will be different than the server domain, we need to handle the CORS policy for the api endpoints. CORS, or Cross-Origin Resource Sharing, is a security feature that restricts what resources a web page can request from another domain. It is designed to prevent a malicious website from stealing data from another website, and it is enforced by all the modern browsers. It is one of the features that make the web secure, however, it can be a pain to deal with when you are developing a web app and combining services from different domains. We will create a middleware in the Go server app to handle the CORS policy for the api endpoints initiate and heartbeat:

package main
import (
    "encoding/json"
    "os/exec"
    "log"
    "net/http"
    "sync"
    "time"
)

type StreamManager struct {
    mu              sync.Mutex
    isActive        bool
    lastHeartbeat   time.Time
    timeoutDuration time.Duration
}

type Response struct {
    Status  string `json:"status"`
    Message string `json:"message"`
    URL     string `json:"url,omitempty"`
}

func NewStreamManager() *StreamManager {
    sm := &StreamManager{
        timeoutDuration: 5 * time.Minute,
    }

    // Start monitoring routine
    go sm.monitor()

    return sm
}

// Add CORS middleware
func enableCors(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // Set CORS headers
        w.Header().Set("Access-Control-Allow-Origin", "*")
        w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
        w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type")

        // Handle preflight
        if r.Method == "OPTIONS" {
            w.WriteHeader(http.StatusOK)
            return
        }

        // Call the actual handler
        next(w, r)
    }
}

func (sm *StreamManager) stopService() error {
    log.Printf("Attempting to stop camera-stream.service...")
    // First try to stop service normally
    cmd := exec.Command("systemctl", "stop", "camera-stream.service")
    output, err := cmd.CombinedOutput()
    if err != nil {
        log.Printf("Error stopping service: %v, output: %s", err, string(output))
        return err
    }

    // Double check if ffmpeg is still running
    time.Sleep(2 * time.Second)  // Give it a moment
    checkCmd := exec.Command("pgrep", "ffmpeg")
    if output, _ := checkCmd.CombinedOutput(); len(output) > 0 {
        log.Printf("FFmpeg still running, forcing kill...")
        killCmd := exec.Command("pkill", "-9", "ffmpeg")
        if err := killCmd.Run(); err != nil {
            log.Printf("Error force killing ffmpeg: %v", err)
        }
    }

    log.Printf("Service stop command completed successfully")
    return nil
}

func (sm *StreamManager) isServiceRunning() (bool, error) {
    cmd := exec.Command("systemctl", "is-active", "camera-stream.service")
    output, err := cmd.CombinedOutput()
    if err != nil {
        return false, nil  // Service is not running
    }
    return string(output) == "active\n", nil
}

func (sm *StreamManager) InitiateStream(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        log.Printf("Method not allowed: %s", r.Method)
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
        return
    }

    sm.mu.Lock()
    defer sm.mu.Unlock()

    // Check if already active
    if sm.isActive {
        log.Printf("Stream already active, returning existing URL")
        resp := Response{
            Status:  "success",
            Message: "Stream already active",
            URL:     "/playlist.m3u8",
        }
        sendJSONResponse(w, resp)
        return
    }

    log.Printf("Starting new stream...")
    // Start the service
    if err := sm.startService(); err != nil {
        log.Printf("Failed to start service: %v", err)
        http.Error(w, "Failed to start stream", http.StatusInternalServerError)
        return
    }

    sm.isActive = true
    sm.lastHeartbeat = time.Now()

    log.Printf("Stream started successfully")
    resp := Response{
        Status:  "success",
        Message: "Stream initiated",
        URL:     "/playlist.m3u8",
    }
    sendJSONResponse(w, resp)
}

func (sm *StreamManager) Heartbeat(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
        return
    }

    sm.mu.Lock()
    defer sm.mu.Unlock()

    if !sm.isActive {
        http.Error(w, "No active stream", http.StatusNotFound)
        return
    }

    sm.lastHeartbeat = time.Now()

    resp := Response{
        Status:  "success",
        Message: "Heartbeat received",
    }
    sendJSONResponse(w, resp)
}

func (sm *StreamManager) monitor() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        sm.mu.Lock()
        if sm.isActive && time.Since(sm.lastHeartbeat) > sm.timeoutDuration {
            log.Println("Stream timed out, stopping service...")
            if err := sm.stopService(); err != nil {
                log.Printf("Error stopping service: %v", err)
            }
            sm.isActive = false
        }
        sm.mu.Unlock()
    }
}

func sendJSONResponse(w http.ResponseWriter, resp Response) {
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(resp)
}

func main() {
    sm := NewStreamManager()

    http.HandleFunc("/initiate", enableCors(sm.InitiateStream))
    http.HandleFunc("/heartbeat", enableCors(sm.Heartbeat))

    log.Println("Server starting on :8080...")
    log.Fatal(http.ListenAndServe(":8080", nil))
}
The differences between the previous version of the server app and this version are:
  • We have defined a new middleware function enableCors that sets the CORS headers for the api endpoints initiate and heartbeat.
  • We have added the middleware function enableCors to the api endpoints initiate and heartbeat.
For increased security and to prevent unauthorized access to the api endpoints:
  • You can restrict the allowed origins by changing the value of the Access-Control-Allow-Origin header from *, which allows all domains, to the domain of your SvelteKit app. Don't forget to add your development domain as well, which is usually localhost and some ports.
  • You can restrict the allowed methods by changing the value of the Access-Control-Allow-Methods header to only the methods that your app will use.
  • You can use a more secure method to handle the preflight requests by checking the Origin header and only allowing the requests from the domains that you trust.
  • You can use an api key or a token to authenticate the requests to the api endpoints.

Updating Caddyfile

Since the Go server handles the CORS policy for the api endpoints now, we can simplify the Caddyfile by removing the CORS policy for the api endpoints. The Caddyfile should look like this:

stream.test.example.com {
        handle_path /api/* {
                uri strip_prefix /api
                reverse_proxy localhost:8080
        }
                                                                                                                                                                                                                            handle /* {
                root * /var/www/hls
                file_server
                header {
                        Access-Control-Allow-Origin *
                        Cache-Control "no-cache, no-store, must-revalidate"
                }
        }
}
You can restrict the allowed origins by changing the value of the Access-Control-Allow-Origin header from * to the domain of your SvelteKit app. Don't forget to add your development domain as well, which is usually localhost and some ports.

Svelte Component

We will create a new component that can do the following:

  • Send a POST request to the initiate api endpoint to start the RTSP stream.
  • Send a POST request to the heartbeat api endpoint to keep the stream alive.
  • Display the RTSP stream on the page.
  • Handle errors gracefully and display them to the user.
  • Allows the user to retry on failures.
  • When the initiate enpoint is called, it takes between 5-30 seconds for the .m3u8 playlist file to be ready. And even when this file is created on the server, it still takes few more seconds for the server to write the content .ts files into it. For this reason, the component will poll the playlist.m3u8 file every 2 seconds for a maximum of 30 seconds to check if it contains any .ts files. If the playlist contains any .ts files, the component will start the player and display the stream. If the playlist does not contain any .ts files after 30 seconds, the component will display an error message to the user.
  • The video element is added to the DOM only when the player is ready to play the stream. This is to prevent the browser from trying to load the stream before the player is ready. Therefore the component should be able to handle the case where the video element is not available when the player is ready.
  • When the encoding cannot keep up with the streaming (i.e. encoding speed is less than 1.0x), the Hls will return a Network error or Media error and the player will try to recover from it. If the player cannot recover from the error, the component will display an error message to the user.
<!-- StreamPlayer.svelte -->
<script lang="ts">
    import { onDestroy } from 'svelte';
    import Hls from 'hls.js';
    import { Play } from 'lucide-svelte';

    export let text: string = 'Kamera';
    export let baseUrl: string = 'https://stream.test.example.com';
    
    let status: 'idle' | 'initializing' | 'loading-player' | 'ready' | 'error' = 'idle';
    let errorMessage: string = '';
    let videoElement: HTMLVideoElement;
    let hls: Hls | null = null;
    let heartbeatInterval: NodeJS.Timeout | null = null;

    const HEARTBEAT_INTERVAL = 60000;
    const INITIAL_DELAY = 5000;
    const POLL_INTERVAL = 2000;
    const MAX_POLL_ATTEMPTS = 15; // 30 seconds total polling time

    async function checkPlaylistReady(): Promise<boolean> {
        try {
            const response = await fetch(`${baseUrl}/playlist.m3u8`);
            if (!response.ok) return false;
            
            const content = await response.text();
            // Check if the playlist contains any .ts files
            return content.includes('.ts');
        } catch (error) {
            console.log('Playlist check failed:', error);
            return false;
        }
    }

    async function waitForPlaylist(): Promise<void> {
        console.log('Waiting for initial delay...');
        await new Promise(resolve => setTimeout(resolve, INITIAL_DELAY));
        
        console.log('Starting to poll playlist...');
        let attempts = 0;
        
        while (attempts < MAX_POLL_ATTEMPTS) {
            console.log(`Checking playlist (attempt ${attempts + 1}/${MAX_POLL_ATTEMPTS})...`);
            if (await checkPlaylistReady()) {
                console.log('Playlist is ready!');
                return;
            }
            
            await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL));
            attempts++;
        }
        
        throw new Error('Timeout waiting for stream to become ready');
    }

    async function initializeStream() {
        try {
            status = 'initializing';
            errorMessage = '';

            if (!Hls.isSupported()) {
                throw new Error('HLS is not supported in your browser');
            }

            const response = await fetch(`${baseUrl}/api/initiate`, {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'Accept': 'application/json'
                }
            });

            const data = await response.json();
            console.log('Initiate response:', data);

            if (data.status === 'success') {
                console.log('Stream initialization successful, waiting for stream...');
                await waitForPlaylist();
                status = 'loading-player';
                await new Promise(resolve => setTimeout(resolve, 100));
                await startPlayer();
                startHeartbeat();
            } else {
                throw new Error(data.message || 'Failed to initialize stream');
            }
        } catch (error) {
            console.error('Stream initialization error:', error);
            handleError(error instanceof Error ? error.message : 'Unknown error occurred');
        }
    }

    async function startPlayer() {
        return new Promise<void>((resolve, reject) => {
            let retryCount = 0;
            const maxRetries = 5;
            
            const tryInitPlayer = () => {
                if (!videoElement) {
                    if (retryCount < maxRetries) {
                        retryCount++;
                        console.log(`Video element not found, retry ${retryCount}/${maxRetries}`);
                        setTimeout(tryInitPlayer, 200);
                        return;
                    }
                    reject(new Error('Video element not found after retries'));
                    return;
                }

                try {
                    cleanup();

                    hls = new Hls({
                        debug: false,
                        enableWorker: true,
                        lowLatencyMode: true,
                        backBufferLength: 90
                    });

                    const streamUrl = `${baseUrl}/playlist.m3u8`;
                    console.log('Loading stream URL:', streamUrl);

                    hls.loadSource(streamUrl);
                    hls.attachMedia(videoElement);

                    hls.on(Hls.Events.ERROR, (event, data) => {
                        if (data.fatal) {
                            switch(data.type) {
                                case Hls.ErrorTypes.NETWORK_ERROR:
                                    console.log('Network error, attempting recovery...');
                                    hls?.startLoad();
                                    break;
                                case Hls.ErrorTypes.MEDIA_ERROR:
                                    console.log('Media error, attempting recovery...');
                                    hls?.recoverMediaError();
                                    break;
                                default:
                                    handleError('Fatal streaming error occurred');
                                    break;
                            }
                        }
                    });

                    hls.on(Hls.Events.MANIFEST_PARSED, () => {
                        console.log('Manifest parsed successfully, starting playback');
                        status = 'ready';
                        videoElement.play()
                            .then(() => {
                                console.log('Playback started successfully');
                                resolve();
                            })
                            .catch(e => {
                                console.error('Playback error:', e);
                                reject(new Error(`Playback error: ${e.message}`));
                            });
                    });
                } catch (error) {
                    reject(error);
                }
            };

            tryInitPlayer();
        });
    }

    function startHeartbeat() {
        stopHeartbeat();
        
        heartbeatInterval = setInterval(async () => {
            try {
                const response = await fetch(`${baseUrl}/api/heartbeat`, {
                    method: 'POST'
                });

                if (!response.ok) {
                    throw new Error('Heartbeat failed');
                }
            } catch (error) {
                console.error('Heartbeat error:', error);
                handleError('Connection lost');
            }
        }, HEARTBEAT_INTERVAL);
    }

    function stopHeartbeat() {
        if (heartbeatInterval) {
            clearInterval(heartbeatInterval);
            heartbeatInterval = null;
        }
    }

    function handleError(message: string) {
        console.error('Stream error:', message);
        status = 'error';
        errorMessage = message;
        cleanup();
    }

    function cleanup() {
        if (hls) {
            hls.destroy();
            hls = null;
        }
        
        if (videoElement) {
            videoElement.removeAttribute('src');
            videoElement.load();
        }
        
        stopHeartbeat();
    }

    async function startStream() {
        await initializeStream();
    }

    async function retry() {
        cleanup();
        await initializeStream();
    }

    onDestroy(() => {
        cleanup();
    });
</script>

<div class="relative flex h-96 w-full flex-col items-center justify-center rounded-lg bg-gray-100 lg:w-3/4">
    {#if status === 'idle'}
        <div class="flex flex-col items-center space-y-4 text-gray-600">
            <h2 class="text-2xl font-bold">{text}</h2>
            <button
                on:click={startStream}
                class="flex items-center space-x-2 rounded-full bg-blue-500 p-4 text-white hover:bg-blue-600 focus:outline-none focus:ring-2 focus:ring-blue-500 focus:ring-offset-2"
                aria-label="Play stream"
            >
                <Play />
            </button>
        </div>
    {:else if status === 'initializing'}
        <div class="flex flex-col items-center space-y-4 text-gray-600">
            <h2 class="text-2xl font-bold">{text}</h2>
            <div class="flex items-center space-x-2">
                <div class="h-4 w-4 animate-spin rounded-full border-2 border-gray-600 border-t-transparent"></div>
                <p>Starting stream...</p>
            </div>
        </div>
    {:else if status === 'loading-player' || status === 'ready'}
        <video
            bind:this={videoElement}
            class="h-full w-full rounded-lg object-contain bg-gray-100"
            controls
            playsinline
            muted
        ></video>
    {:else if status === 'error'}
        <div class="flex flex-col items-center space-y-4 text-gray-600">
            <h2 class="text-2xl font-bold">{text}</h2>
            <p class="text-red-500">{errorMessage}</p>
            <button
                on:click={retry}
                class="rounded bg-blue-500 px-4 py-2 text-white hover:bg-blue-600 focus:outline-none focus:ring-2 focus:ring-blue-500 focus:ring-offset-2"
            >
                Retry
            </button>
        </div>
    {/if}
</div>

To use this component, you can import it into your SvelteKit app and use it like this:

<script lang="ts">
    import StreamPlayer from '$lib/StreamPlayer.svelte';
</script>

<div class="container mx-auto px-4 my-4">
	<div class="flex flex-col gap-4 lg:flex-row">
		<Camera text="Camera 1" baseUrl="https://stream1.test.example.com" />
		<Camera text="Camera 2" baseUrl="https://stream2.test.example.com"/>
	</div>
</div>