
Streaming RTSP on an HTML Page Part 3
Introduction
In the previous blog posts, we have seen how to stream RTSP on an HTML page both 24/7 and on-demand. In this blog post, we will see how to stream RTSP on a SvelteKit web app using on-demand approach. To achieve modularity, we will create a Camera
component that can be put anywhere on any page.
We will need to modify the Go server code and the Caddyfile slightly to manage the CORS policy; for the api endpoints initiate
and heartbeat
, we will create a middleware in the Go server app to handle the CORS policy. For the RTSP stream, we will use the same approach as in the previous blog post; the CORS policy will be handled by the Caddy server.
Creating a Svelte/SvelteKit App
The creation of the application itself is not the focus of this blog post, so I will not go into detail about it. You can follow the documentation on the SvelteKit website to create a new project. Once you have a working app with basic scaffolding, you can continue with the next steps.
Updating the Server App
As we will be calling the server api endpoints from the SvelteKit app, and our app domain will be different than the server domain, we need to handle the CORS policy for the api endpoints. CORS, or Cross-Origin Resource Sharing, is a security feature that restricts what resources a web page can request from another domain. It is designed to prevent a malicious website from stealing data from another website, and it is enforced by all the modern browsers. It is one of the features that make the web secure, however, it can be a pain to deal with when you are developing a web app and combining services from different domains. We will create a middleware in the Go server app to handle the CORS policy for the api endpoints initiate
and heartbeat
:
package main
import (
"encoding/json"
"os/exec"
"log"
"net/http"
"sync"
"time"
)
type StreamManager struct {
mu sync.Mutex
isActive bool
lastHeartbeat time.Time
timeoutDuration time.Duration
}
type Response struct {
Status string `json:"status"`
Message string `json:"message"`
URL string `json:"url,omitempty"`
}
func NewStreamManager() *StreamManager {
sm := &StreamManager{
timeoutDuration: 5 * time.Minute,
}
// Start monitoring routine
go sm.monitor()
return sm
}
// Add CORS middleware
func enableCors(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Set CORS headers
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type")
// Handle preflight
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
// Call the actual handler
next(w, r)
}
}
func (sm *StreamManager) stopService() error {
log.Printf("Attempting to stop camera-stream.service...")
// First try to stop service normally
cmd := exec.Command("systemctl", "stop", "camera-stream.service")
output, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Error stopping service: %v, output: %s", err, string(output))
return err
}
// Double check if ffmpeg is still running
time.Sleep(2 * time.Second) // Give it a moment
checkCmd := exec.Command("pgrep", "ffmpeg")
if output, _ := checkCmd.CombinedOutput(); len(output) > 0 {
log.Printf("FFmpeg still running, forcing kill...")
killCmd := exec.Command("pkill", "-9", "ffmpeg")
if err := killCmd.Run(); err != nil {
log.Printf("Error force killing ffmpeg: %v", err)
}
}
log.Printf("Service stop command completed successfully")
return nil
}
func (sm *StreamManager) isServiceRunning() (bool, error) {
cmd := exec.Command("systemctl", "is-active", "camera-stream.service")
output, err := cmd.CombinedOutput()
if err != nil {
return false, nil // Service is not running
}
return string(output) == "active\n", nil
}
func (sm *StreamManager) InitiateStream(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
log.Printf("Method not allowed: %s", r.Method)
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
sm.mu.Lock()
defer sm.mu.Unlock()
// Check if already active
if sm.isActive {
log.Printf("Stream already active, returning existing URL")
resp := Response{
Status: "success",
Message: "Stream already active",
URL: "/playlist.m3u8",
}
sendJSONResponse(w, resp)
return
}
log.Printf("Starting new stream...")
// Start the service
if err := sm.startService(); err != nil {
log.Printf("Failed to start service: %v", err)
http.Error(w, "Failed to start stream", http.StatusInternalServerError)
return
}
sm.isActive = true
sm.lastHeartbeat = time.Now()
log.Printf("Stream started successfully")
resp := Response{
Status: "success",
Message: "Stream initiated",
URL: "/playlist.m3u8",
}
sendJSONResponse(w, resp)
}
func (sm *StreamManager) Heartbeat(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
sm.mu.Lock()
defer sm.mu.Unlock()
if !sm.isActive {
http.Error(w, "No active stream", http.StatusNotFound)
return
}
sm.lastHeartbeat = time.Now()
resp := Response{
Status: "success",
Message: "Heartbeat received",
}
sendJSONResponse(w, resp)
}
func (sm *StreamManager) monitor() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
sm.mu.Lock()
if sm.isActive && time.Since(sm.lastHeartbeat) > sm.timeoutDuration {
log.Println("Stream timed out, stopping service...")
if err := sm.stopService(); err != nil {
log.Printf("Error stopping service: %v", err)
}
sm.isActive = false
}
sm.mu.Unlock()
}
}
func sendJSONResponse(w http.ResponseWriter, resp Response) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
func main() {
sm := NewStreamManager()
http.HandleFunc("/initiate", enableCors(sm.InitiateStream))
http.HandleFunc("/heartbeat", enableCors(sm.Heartbeat))
log.Println("Server starting on :8080...")
log.Fatal(http.ListenAndServe(":8080", nil))
}
The differences between the previous version of the server app and this version are:
- We have defined a new middleware function
enableCors
that sets the CORS headers for the api endpointsinitiate
andheartbeat
. - We have added the middleware function
enableCors
to the api endpointsinitiate
andheartbeat
.
-
You can restrict the allowed origins by changing the value of the
Access-Control-Allow-Origin
header from*
, which allows all domains, to the domain of your SvelteKit app. Don't forget to add your development domain as well, which is usuallylocalhost
and some ports. -
You can restrict the allowed methods by changing the value of the
Access-Control-Allow-Methods
header to only the methods that your app will use. -
You can use a more secure method to handle the preflight requests by checking the
Origin
header and only allowing the requests from the domains that you trust. - You can use an api key or a token to authenticate the requests to the api endpoints.
Updating Caddyfile
Since the Go server handles the CORS policy for the api endpoints now, we can simplify the Caddyfile by removing the CORS policy for the api endpoints. The Caddyfile should look like this:
stream.test.example.com {
handle_path /api/* {
uri strip_prefix /api
reverse_proxy localhost:8080
}
handle /* {
root * /var/www/hls
file_server
header {
Access-Control-Allow-Origin *
Cache-Control "no-cache, no-store, must-revalidate"
}
}
}
You can restrict the allowed origins by changing the value of the Access-Control-Allow-Origin
header from *
to the domain of your SvelteKit app. Don't forget to add your development domain as well, which is usually localhost
and some ports.
Svelte Component
We will create a new component that can do the following:
-
Send a POST request to the
initiate
api endpoint to start the RTSP stream. -
Send a POST request to the
heartbeat
api endpoint to keep the stream alive. - Display the RTSP stream on the page.
- Handle errors gracefully and display them to the user.
- Allows the user to retry on failures.
-
When the
initiate
enpoint is called, it takes between 5-30 seconds for the .m3u8 playlist file to be ready. And even when this file is created on the server, it still takes few more seconds for the server to write the content.ts
files into it. For this reason, the component will poll theplaylist.m3u8
file every 2 seconds for a maximum of 30 seconds to check if it contains any.ts
files. If the playlist contains any.ts
files, the component will start the player and display the stream. If the playlist does not contain any.ts
files after 30 seconds, the component will display an error message to the user. - The video element is added to the DOM only when the player is ready to play the stream. This is to prevent the browser from trying to load the stream before the player is ready. Therefore the component should be able to handle the case where the video element is not available when the player is ready.
-
When the encoding cannot keep up with the streaming (i.e. encoding speed is less than 1.0x), the Hls will return a
Network error
orMedia error
and the player will try to recover from it. If the player cannot recover from the error, the component will display an error message to the user.
<!-- StreamPlayer.svelte -->
<script lang="ts">
import { onDestroy } from 'svelte';
import Hls from 'hls.js';
import { Play } from 'lucide-svelte';
export let text: string = 'Kamera';
export let baseUrl: string = 'https://stream.test.example.com';
let status: 'idle' | 'initializing' | 'loading-player' | 'ready' | 'error' = 'idle';
let errorMessage: string = '';
let videoElement: HTMLVideoElement;
let hls: Hls | null = null;
let heartbeatInterval: NodeJS.Timeout | null = null;
const HEARTBEAT_INTERVAL = 60000;
const INITIAL_DELAY = 5000;
const POLL_INTERVAL = 2000;
const MAX_POLL_ATTEMPTS = 15; // 30 seconds total polling time
async function checkPlaylistReady(): Promise<boolean> {
try {
const response = await fetch(`${baseUrl}/playlist.m3u8`);
if (!response.ok) return false;
const content = await response.text();
// Check if the playlist contains any .ts files
return content.includes('.ts');
} catch (error) {
console.log('Playlist check failed:', error);
return false;
}
}
async function waitForPlaylist(): Promise<void> {
console.log('Waiting for initial delay...');
await new Promise(resolve => setTimeout(resolve, INITIAL_DELAY));
console.log('Starting to poll playlist...');
let attempts = 0;
while (attempts < MAX_POLL_ATTEMPTS) {
console.log(`Checking playlist (attempt ${attempts + 1}/${MAX_POLL_ATTEMPTS})...`);
if (await checkPlaylistReady()) {
console.log('Playlist is ready!');
return;
}
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL));
attempts++;
}
throw new Error('Timeout waiting for stream to become ready');
}
async function initializeStream() {
try {
status = 'initializing';
errorMessage = '';
if (!Hls.isSupported()) {
throw new Error('HLS is not supported in your browser');
}
const response = await fetch(`${baseUrl}/api/initiate`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
});
const data = await response.json();
console.log('Initiate response:', data);
if (data.status === 'success') {
console.log('Stream initialization successful, waiting for stream...');
await waitForPlaylist();
status = 'loading-player';
await new Promise(resolve => setTimeout(resolve, 100));
await startPlayer();
startHeartbeat();
} else {
throw new Error(data.message || 'Failed to initialize stream');
}
} catch (error) {
console.error('Stream initialization error:', error);
handleError(error instanceof Error ? error.message : 'Unknown error occurred');
}
}
async function startPlayer() {
return new Promise<void>((resolve, reject) => {
let retryCount = 0;
const maxRetries = 5;
const tryInitPlayer = () => {
if (!videoElement) {
if (retryCount < maxRetries) {
retryCount++;
console.log(`Video element not found, retry ${retryCount}/${maxRetries}`);
setTimeout(tryInitPlayer, 200);
return;
}
reject(new Error('Video element not found after retries'));
return;
}
try {
cleanup();
hls = new Hls({
debug: false,
enableWorker: true,
lowLatencyMode: true,
backBufferLength: 90
});
const streamUrl = `${baseUrl}/playlist.m3u8`;
console.log('Loading stream URL:', streamUrl);
hls.loadSource(streamUrl);
hls.attachMedia(videoElement);
hls.on(Hls.Events.ERROR, (event, data) => {
if (data.fatal) {
switch(data.type) {
case Hls.ErrorTypes.NETWORK_ERROR:
console.log('Network error, attempting recovery...');
hls?.startLoad();
break;
case Hls.ErrorTypes.MEDIA_ERROR:
console.log('Media error, attempting recovery...');
hls?.recoverMediaError();
break;
default:
handleError('Fatal streaming error occurred');
break;
}
}
});
hls.on(Hls.Events.MANIFEST_PARSED, () => {
console.log('Manifest parsed successfully, starting playback');
status = 'ready';
videoElement.play()
.then(() => {
console.log('Playback started successfully');
resolve();
})
.catch(e => {
console.error('Playback error:', e);
reject(new Error(`Playback error: ${e.message}`));
});
});
} catch (error) {
reject(error);
}
};
tryInitPlayer();
});
}
function startHeartbeat() {
stopHeartbeat();
heartbeatInterval = setInterval(async () => {
try {
const response = await fetch(`${baseUrl}/api/heartbeat`, {
method: 'POST'
});
if (!response.ok) {
throw new Error('Heartbeat failed');
}
} catch (error) {
console.error('Heartbeat error:', error);
handleError('Connection lost');
}
}, HEARTBEAT_INTERVAL);
}
function stopHeartbeat() {
if (heartbeatInterval) {
clearInterval(heartbeatInterval);
heartbeatInterval = null;
}
}
function handleError(message: string) {
console.error('Stream error:', message);
status = 'error';
errorMessage = message;
cleanup();
}
function cleanup() {
if (hls) {
hls.destroy();
hls = null;
}
if (videoElement) {
videoElement.removeAttribute('src');
videoElement.load();
}
stopHeartbeat();
}
async function startStream() {
await initializeStream();
}
async function retry() {
cleanup();
await initializeStream();
}
onDestroy(() => {
cleanup();
});
</script>
<div class="relative flex h-96 w-full flex-col items-center justify-center rounded-lg bg-gray-100 lg:w-3/4">
{#if status === 'idle'}
<div class="flex flex-col items-center space-y-4 text-gray-600">
<h2 class="text-2xl font-bold">{text}</h2>
<button
on:click={startStream}
class="flex items-center space-x-2 rounded-full bg-blue-500 p-4 text-white hover:bg-blue-600 focus:outline-none focus:ring-2 focus:ring-blue-500 focus:ring-offset-2"
aria-label="Play stream"
>
<Play />
</button>
</div>
{:else if status === 'initializing'}
<div class="flex flex-col items-center space-y-4 text-gray-600">
<h2 class="text-2xl font-bold">{text}</h2>
<div class="flex items-center space-x-2">
<div class="h-4 w-4 animate-spin rounded-full border-2 border-gray-600 border-t-transparent"></div>
<p>Starting stream...</p>
</div>
</div>
{:else if status === 'loading-player' || status === 'ready'}
<video
bind:this={videoElement}
class="h-full w-full rounded-lg object-contain bg-gray-100"
controls
playsinline
muted
></video>
{:else if status === 'error'}
<div class="flex flex-col items-center space-y-4 text-gray-600">
<h2 class="text-2xl font-bold">{text}</h2>
<p class="text-red-500">{errorMessage}</p>
<button
on:click={retry}
class="rounded bg-blue-500 px-4 py-2 text-white hover:bg-blue-600 focus:outline-none focus:ring-2 focus:ring-blue-500 focus:ring-offset-2"
>
Retry
</button>
</div>
{/if}
</div>
To use this component, you can import it into your SvelteKit app and use it like this:
<script lang="ts">
import StreamPlayer from '$lib/StreamPlayer.svelte';
</script>
<div class="container mx-auto px-4 my-4">
<div class="flex flex-col gap-4 lg:flex-row">
<Camera text="Camera 1" baseUrl="https://stream1.test.example.com" />
<Camera text="Camera 2" baseUrl="https://stream2.test.example.com"/>
</div>
</div>