Skip to content
Snippets Groups Projects
Commit 6ba297bf authored by me5na7qbjqbrp's avatar me5na7qbjqbrp
Browse files

Merge branch 'multicast' into 'golang'

Multicast

See merge request !1
parents ad26242d b647c761
No related branches found
No related tags found
1 merge request!1Multicast
Pipeline #4184 passed with stages
in 2 minutes and 7 seconds
......@@ -46,3 +46,9 @@ webrtc:
# STUN servers, you should host your own Coturn instance
STUNServers:
- stun:stun.l.google.com:19302
# Configuration for the multicast feature
multicast:
outputs:
# demo:
# - rtmp://localhost:1925
......@@ -3,6 +3,7 @@
package main
import (
"gitlab.crans.org/nounous/ghostream/stream/multicast"
"log"
"strings"
......@@ -57,6 +58,7 @@ func loadConfiguration() {
viper.SetDefault("WebRTC.MinPortUDP", 10000)
viper.SetDefault("WebRTC.MaxPortUDP", 10005)
viper.SetDefault("WebRTC.STUNServers", []string{"stun:stun.l.google.com:19302"})
viper.SetDefault("Multicast.Outputs", make(map[string][]string))
// Copy STUN configuration to clients
viper.Set("Web.STUNServers", viper.Get("WebRTC.STUNServers"))
......@@ -68,6 +70,7 @@ func main() {
cfg := struct {
Auth auth.Options
Monitoring monitoring.Options
Multicast multicast.Options
Srt srt.Options
Web web.Options
WebRTC webrtc.Options
......@@ -93,6 +96,12 @@ func main() {
go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web)
go webrtc.Serve(remoteSdpChan, localSdpChan, &cfg.WebRTC)
// Init multicast
err = multicast.New(&cfg.Multicast)
if err != nil {
log.Fatalln("Failed to load multicast app:", err)
}
// Wait for routines
select {}
}
package main
import (
"github.com/spf13/viper"
"gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream/multicast"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
"gitlab.crans.org/nounous/ghostream/web"
"testing"
)
// TestLoadConfiguration tests the configuration file loading and the init of some parameters
func TestLoadConfiguration(t *testing.T) {
loadConfiguration()
cfg := struct {
Auth auth.Options
Monitoring monitoring.Options
Multicast multicast.Options
Srt srt.Options
Web web.Options
WebRTC webrtc.Options
}{}
if err := viper.Unmarshal(&cfg); err != nil {
t.Fatal("Failed to load settings", err)
}
}
package multicast
package multicast
import (
"bufio"
"io"
"log"
"os/exec"
)
// Options to configure the multicast:
//for each stream key, we can have several additional stream URL where the main stream is redirected to
type Options struct {
Outputs map[string][]string
}
var (
options Options
ffmpegInstances = make(map[string]*exec.Cmd)
ffmpegInputStreams = make(map[string]*io.WriteCloser)
)
// New Load configuration
func New(cfg *Options) error {
options = *cfg
return nil
}
// RegisterStream Declare a new open stream and create ffmpeg instances
func RegisterStream(streamKey string) {
if len(options.Outputs[streamKey]) == 0 {
return
}
params := []string{"-re", "-i", "pipe:0"}
for _, stream := range options.Outputs[streamKey] {
params = append(params, "-f", "flv", "-preset", "ultrafast", "-tune", "zerolatency",
"-c", "copy", stream)
}
// Launch FFMPEG instance
ffmpeg := exec.Command("ffmpeg", params...)
// Open pipes
input, err := ffmpeg.StdinPipe()
if err != nil {
panic(err)
}
output, err := ffmpeg.StdoutPipe()
if err != nil {
panic(err)
}
errOutput, err := ffmpeg.StderrPipe()
if err != nil {
panic(err)
}
ffmpegInstances[streamKey] = ffmpeg
ffmpegInputStreams[streamKey] = &input
if err := ffmpeg.Start(); err != nil {
panic(err)
}
// Log ffmpeg output
go func() {
scanner := bufio.NewScanner(output)
for scanner.Scan() {
log.Println("[FFMPEG " + streamKey + "] " + scanner.Text())
}
}()
// Log also error output
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Println("[FFMPEG ERROR " + streamKey + "] " + scanner.Text())
}
}()
}
// SendPacket When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key
func SendPacket(streamKey string, data []byte) {
stdin := ffmpegInputStreams[streamKey]
_, err := (*stdin).Write(data)
if err != nil {
log.Println("Error while sending a packet to external streaming server for key "+streamKey, err)
}
}
// CloseConnection When the stream is ended, close FFMPEG instances
func CloseConnection(streamKey string) {
ffmpeg := ffmpegInstances[streamKey]
if err := ffmpeg.Process.Kill(); err != nil {
panic(err)
}
delete(ffmpegInstances, streamKey)
delete(ffmpegInputStreams, streamKey)
}
package srt
import (
"gitlab.crans.org/nounous/ghostream/stream/multicast"
"log"
"net"
"strconv"
......@@ -56,26 +57,35 @@ func Serve(cfg *Options) {
// Create a new buffer
buff := make([]byte, 2048)
// Setup linked multicasts
multicast.RegisterStream("demo") // FIXME Replace with real stream key
// Read RTP packets forever and send them to the WebRTC Client
for {
n, err := s.Read(buff, 10000)
if err != nil {
log.Println("Error occured while reading SRT socket:", err)
multicast.CloseConnection("demo")
break
}
if n == 0 {
// End of stream
log.Printf("Received no bytes, stopping stream.")
multicast.CloseConnection("demo")
break
}
log.Printf("Received %d bytes", n)
// Send raw packet to other streams
multicast.SendPacket("demo", buff[:n])
// Unmarshal incoming packet
packet := &rtp.Packet{}
if err := packet.Unmarshal(buff[:n]); err != nil {
log.Println("Error occured while unmarshaling SRT:", err)
multicast.CloseConnection("demo")
break
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment