554 lines
14 KiB
Go
554 lines
14 KiB
Go
package player
|
|
|
|
import (
|
|
"git.nobrain.org/r4/dischord/audio"
|
|
"git.nobrain.org/r4/dischord/extractor"
|
|
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"sort"
|
|
"time"
|
|
)
|
|
|
|
func init() {
|
|
rand.Seed(time.Now().UnixNano())
|
|
}
|
|
|
|
type Queue struct {
|
|
Done []extractor.Data
|
|
Playing *extractor.Data
|
|
Ahead []extractor.Data
|
|
AheadUnshuffled []extractor.Data
|
|
ShuffleOffset int
|
|
Paused bool
|
|
Loop bool
|
|
}
|
|
|
|
func (q *Queue) Copy() *Queue {
|
|
res := &Queue{
|
|
Done: append([]extractor.Data{}, q.Done...),
|
|
Playing: nil,
|
|
Ahead: append([]extractor.Data{}, q.Ahead...),
|
|
AheadUnshuffled: append([]extractor.Data{}, q.AheadUnshuffled...),
|
|
ShuffleOffset: q.ShuffleOffset,
|
|
Paused: q.Paused,
|
|
Loop: q.Loop,
|
|
}
|
|
if q.Playing != nil {
|
|
res.Playing = &extractor.Data{}
|
|
*res.Playing = *q.Playing
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (q *Queue) InBounds(i int) bool {
|
|
return !(i == 0 && q.Playing == nil) &&
|
|
!(i < 0 && -i-1 >= len(q.Done)) &&
|
|
!(i > 0 && i-1 >= len(q.Ahead))
|
|
}
|
|
|
|
func (q *Queue) At(i int) *extractor.Data {
|
|
if !q.InBounds(i) {
|
|
return nil
|
|
}
|
|
if i < 0 {
|
|
return &q.Done[len(q.Done)+i]
|
|
} else if i == 0 {
|
|
return q.Playing
|
|
} else {
|
|
return &q.Ahead[i-1]
|
|
}
|
|
}
|
|
|
|
// Client commands
|
|
// Get commands can most easily be executed via the builtin convenience functions
|
|
// NOTE: When using commands like Jump, Swap and Delete, we can't be sure
|
|
// that our versions of the indices correspond to the versions the caller
|
|
// is referring to, but in this context, that should be fine
|
|
type Cmd interface{}
|
|
type CmdPlay struct{}
|
|
type CmdPause struct{}
|
|
type CmdLoop bool
|
|
type CmdJump int // relative track to jump to (e.g. -2, -1, 4)
|
|
type CmdSkipAll struct{}
|
|
type CmdShuffle struct{}
|
|
type CmdUnshuffle struct{}
|
|
type CmdSwap struct{ A, B int }
|
|
type CmdDelete []int
|
|
type CmdAddFront []extractor.Data
|
|
type CmdAddBack []extractor.Data
|
|
type CmdSeek float64 // seconds
|
|
type CmdSpeed float64 // speed factor
|
|
type CmdPlayFileAndStop struct {
|
|
DoneCh chan<- struct{}
|
|
Data []byte
|
|
}
|
|
type CmdGetTime chan<- float64
|
|
type CmdGetQueue chan<- *Queue
|
|
type CmdGetSpeed chan<- float64
|
|
|
|
type Client struct {
|
|
CmdCh chan<- Cmd
|
|
// ErrCh is blocking meaning that you will have to constantly read it,
|
|
// either using another goroutine or a select statement whenever sending a
|
|
// command.
|
|
ErrCh <-chan error
|
|
}
|
|
|
|
func (c Client) GetTime() float64 {
|
|
ch := make(chan float64)
|
|
c.CmdCh <- CmdGetTime(ch)
|
|
return <-ch
|
|
}
|
|
|
|
func (c Client) GetQueue() *Queue {
|
|
ch := make(chan *Queue)
|
|
c.CmdCh <- CmdGetQueue(ch)
|
|
return <-ch
|
|
}
|
|
|
|
func (c Client) GetSpeed() float64 {
|
|
ch := make(chan float64)
|
|
c.CmdCh <- CmdGetSpeed(ch)
|
|
return <-ch
|
|
}
|
|
|
|
type Event interface{}
|
|
type EventStreamUpdated struct{}
|
|
type EventKilled struct{}
|
|
|
|
type Callback interface{}
|
|
|
|
// Creates a new player client that will run in parallel and receive commands
|
|
// via the returned Client.CmdCh. All audio will be sent via the given outCh.
|
|
// Closing the returned Client.CmdCh channel acts as a kill signal.
|
|
func NewClient(excfg extractor.Config, ffmpegPath string, outCh chan<- []byte, callbacks ...Callback) Client {
|
|
// Client channels
|
|
cCmdCh := make(chan Cmd)
|
|
cErrCh := make(chan error)
|
|
|
|
// Callback setup
|
|
var callbacksStreamUpdated []func(EventStreamUpdated)
|
|
var callbacksKilled []func(EventKilled)
|
|
for _, c := range callbacks {
|
|
switch v := c.(type) {
|
|
case func(EventStreamUpdated):
|
|
callbacksStreamUpdated = append(callbacksStreamUpdated, v)
|
|
case func(EventKilled):
|
|
callbacksKilled = append(callbacksKilled, v)
|
|
default:
|
|
panic("player.NewClient(): invalid callback function type: " + fmt.Sprintf("%T", v))
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
nFrames := 0
|
|
tStart := 0.0
|
|
playbackSpeed := 1.0
|
|
|
|
var queue Queue
|
|
|
|
lastStreamErr := time.Unix(0, 0)
|
|
|
|
var filePlaybackDoneCh chan<- struct{}
|
|
|
|
// Mostly notes to self:
|
|
// This entire setup is pretty fragile, so I'll try to explain it:
|
|
// Each stream consists of the three fundamental channels below.
|
|
// - audioch sends us the encoded audio frames; if audioch is closed,
|
|
// that means the stream exited successfully
|
|
// - errch sends any potential error (singular!): if any error occurs,
|
|
// the client automatically shuts down
|
|
// - killch is where we can send a manual kill signal; no error or
|
|
// shutdown confirmation will follow
|
|
//
|
|
// Now, it is important here that any time a stream is terminated (either
|
|
// by a user or the stream terminates itself by closing audio ch or
|
|
// sending an error) all three channels are set to nil. Otherwise (and
|
|
// I have spent lots of time debugging this), the goroutine will just
|
|
// lock up trying to send a kill signal through a channel which no
|
|
// longer has a receiver.
|
|
//
|
|
// To achieve a kind of stability, I try to follow the basics rule that:
|
|
// - any time I do manipulate queue.Playing, I kill the current
|
|
// stream first
|
|
// - there are exactly three ways a stream can end and all have to be
|
|
// handled separately: termination by the user (only through
|
|
// calling killStream), exiting with an error (handled in select:
|
|
// case err, ok := <-errch), and exiting successfully by closing
|
|
// audioch (handled in select: case cmd, ok := <-cCmdCh: if !ok)
|
|
var audioch <-chan []byte
|
|
var errch <-chan error
|
|
var killch chan<- struct{}
|
|
|
|
getPlaybackTime := func() float64 {
|
|
return tStart + float64(nFrames)*audio.FrameDuration*playbackSpeed
|
|
}
|
|
|
|
getMaxCachedPlaybackTime := func() float64 {
|
|
return tStart + float64(nFrames+len(audioch))*audio.FrameDuration*playbackSpeed
|
|
}
|
|
|
|
readAudioCh := func() <-chan []byte {
|
|
if queue.Paused {
|
|
return nil
|
|
} else {
|
|
return audioch
|
|
}
|
|
}
|
|
|
|
killStream := func() {
|
|
if killch != nil {
|
|
killch <- struct{}{}
|
|
audioch = nil
|
|
errch = nil
|
|
killch = nil
|
|
}
|
|
}
|
|
|
|
var jumpTracks func(nRel int)
|
|
|
|
refreshStream := func(seek float64, speed float64) {
|
|
if queue.Playing == nil {
|
|
// Reset stream info
|
|
nFrames = 0
|
|
tStart = 0.0
|
|
playbackSpeed = 1.0
|
|
} else {
|
|
// Kill the potential current stream
|
|
killStream()
|
|
|
|
// Refresh stream URL if necessary
|
|
if queue.Playing.StreamUrl == "" || time.Now().After(queue.Playing.Expires) {
|
|
var data []extractor.Data
|
|
var err error
|
|
for {
|
|
data, err = extractor.Extract(excfg, queue.Playing.SourceUrl)
|
|
if err == nil {
|
|
break
|
|
} else {
|
|
now := time.Now()
|
|
if lastStreamErr.Sub(now) > 5*time.Second {
|
|
lastStreamErr = now
|
|
continue
|
|
} else {
|
|
jumpTracks(1)
|
|
lastStreamErr = now
|
|
cErrCh <- errors.New("skipping stream due to multiple errors")
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if err == nil {
|
|
if len(data) == 1 {
|
|
*queue.Playing = data[0]
|
|
} else {
|
|
cErrCh <- errors.New("got invalid data refreshing stream")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Get new stream
|
|
audioch, errch, killch = audio.StreamToDiscordOpus(ffmpegPath, queue.Playing.StreamUrl, nil, seek, speed, true)
|
|
|
|
// Reset stream info
|
|
nFrames = 0
|
|
tStart = seek
|
|
playbackSpeed = speed
|
|
}
|
|
|
|
for _, c := range callbacksStreamUpdated {
|
|
c(EventStreamUpdated{})
|
|
}
|
|
}
|
|
|
|
// Queue overflow safe
|
|
jumpTracks = func(nRel int) {
|
|
// Kill the potential current stream
|
|
killStream()
|
|
|
|
if nRel > 0 && nRel > len(queue.Ahead) {
|
|
nRel = len(queue.Ahead)
|
|
if nRel == 0 {
|
|
nRel = 1
|
|
}
|
|
} else if nRel < 0 && -nRel > len(queue.Done) {
|
|
nRel = len(queue.Done)
|
|
if nRel == 0 {
|
|
nRel = -1
|
|
}
|
|
}
|
|
|
|
// We can imagine this algorithm like a tape where A B C D E are
|
|
// the items, B is currently playing and we want to skip 2 tracks
|
|
// ahead (D: queue.Done, P: queue.Playing, A: queue.Ahead):
|
|
// A [B] C D E
|
|
// D: A; P: B; A: C D E
|
|
if nRel > 0 {
|
|
// A B [] C D E
|
|
// D: A B, P: , A: C D E
|
|
if queue.Playing != nil {
|
|
queue.Done = append(queue.Done, *queue.Playing)
|
|
}
|
|
|
|
// A B C [] D E
|
|
// D: A B C, P: , A: D E
|
|
queue.Done = append(queue.Done, queue.Ahead[:nRel-1]...)
|
|
queue.Ahead = queue.Ahead[nRel-1:]
|
|
|
|
// A B C [D] E
|
|
// D: A B C, P: D, A: E
|
|
if len(queue.Ahead) > 0 {
|
|
queue.Playing = new(extractor.Data)
|
|
*queue.Playing = queue.Ahead[0]
|
|
queue.Ahead = queue.Ahead[1:]
|
|
} else {
|
|
queue.Playing = nil
|
|
}
|
|
|
|
queue.ShuffleOffset -= nRel
|
|
} else if nRel < 0 {
|
|
// The same thing in reverse
|
|
|
|
nRel *= -1
|
|
|
|
if queue.Playing != nil {
|
|
queue.Ahead = append([]extractor.Data{*queue.Playing}, queue.Ahead...)
|
|
}
|
|
|
|
ql := len(queue.Done)
|
|
queue.Ahead = append(queue.Done[ql-(nRel-1):ql], queue.Ahead...)
|
|
queue.Done = queue.Done[:ql-(nRel-1)]
|
|
|
|
if len(queue.Done) > 0 {
|
|
ql := len(queue.Done)
|
|
queue.Playing = new(extractor.Data)
|
|
*queue.Playing = queue.Done[ql-1]
|
|
queue.Done = queue.Done[:ql-1]
|
|
} else {
|
|
queue.Playing = nil
|
|
}
|
|
|
|
queue.ShuffleOffset += nRel
|
|
}
|
|
|
|
// Update stream
|
|
refreshStream(0, playbackSpeed)
|
|
}
|
|
|
|
var unshuffle func()
|
|
|
|
shuffle := func() {
|
|
if queue.AheadUnshuffled != nil {
|
|
unshuffle()
|
|
}
|
|
queue.AheadUnshuffled = append([]extractor.Data{}, queue.Ahead...)
|
|
rand.Shuffle(len(queue.Ahead), func(i, j int) {
|
|
queue.Ahead[i], queue.Ahead[j] = queue.Ahead[j], queue.Ahead[i]
|
|
})
|
|
queue.ShuffleOffset = 0
|
|
}
|
|
|
|
unshuffle = func() {
|
|
if queue.AheadUnshuffled == nil {
|
|
return
|
|
}
|
|
if queue.ShuffleOffset <= 0 {
|
|
if -queue.ShuffleOffset <= len(queue.AheadUnshuffled) {
|
|
queue.AheadUnshuffled = queue.AheadUnshuffled[-queue.ShuffleOffset:]
|
|
}
|
|
queue.ShuffleOffset = 0
|
|
}
|
|
queue.Ahead = append(queue.Ahead[:queue.ShuffleOffset], queue.AheadUnshuffled...)
|
|
queue.AheadUnshuffled = nil
|
|
}
|
|
|
|
// Main IO loop
|
|
for {
|
|
select {
|
|
case frame, ok := <-readAudioCh():
|
|
if ok {
|
|
outCh <- frame
|
|
nFrames++
|
|
} else {
|
|
// Audio channel was closed -> stream is finished -> reset all stream channels
|
|
audioch = nil
|
|
errch = nil
|
|
killch = nil
|
|
|
|
if filePlaybackDoneCh != nil {
|
|
filePlaybackDoneCh <- struct{}{}
|
|
}
|
|
filePlaybackDoneCh = nil
|
|
|
|
fmt.Println("Audio channel closed, going to next track")
|
|
if queue.Loop {
|
|
refreshStream(0, playbackSpeed)
|
|
} else {
|
|
jumpTracks(1)
|
|
}
|
|
}
|
|
case err, ok := <-errch:
|
|
if ok {
|
|
// Propagate error
|
|
cErrCh <- err
|
|
|
|
// Stream has closed with error -> reset all of its channels
|
|
audioch = nil
|
|
errch = nil
|
|
killch = nil
|
|
|
|
// Try to resurrect stream (if it fails again in the
|
|
// next 5 seconds, we'll skip the track instead)
|
|
now := time.Now()
|
|
if lastStreamErr.Sub(now) > 5*time.Second {
|
|
refreshStream(getPlaybackTime(), playbackSpeed)
|
|
} else {
|
|
jumpTracks(1)
|
|
cErrCh <- errors.New("skipping stream due to multiple errors")
|
|
}
|
|
lastStreamErr = now
|
|
} else {
|
|
// Stream is done without err, but not fully read yet -> block
|
|
// all future errch reads
|
|
// Also, killch is now unnecessary
|
|
errch = nil
|
|
killch = nil
|
|
}
|
|
case cmd, ok := <-cCmdCh:
|
|
if !ok {
|
|
// cCmdCh was closed by the user -> client is told to shut down
|
|
fmt.Println("Command channel closed, killing client")
|
|
killStream()
|
|
for _, c := range callbacksKilled {
|
|
c(EventKilled{})
|
|
}
|
|
return
|
|
} else {
|
|
switch v := cmd.(type) {
|
|
case CmdPlay:
|
|
queue.Paused = false
|
|
if audioch == nil {
|
|
jumpTracks(1)
|
|
}
|
|
case CmdPause:
|
|
queue.Paused = true
|
|
case CmdLoop:
|
|
queue.Loop = bool(v)
|
|
case CmdJump:
|
|
jumpTracks(int(v))
|
|
case CmdSkipAll:
|
|
killStream()
|
|
if queue.Playing != nil {
|
|
queue.Done = append(queue.Done, *queue.Playing)
|
|
queue.Playing = nil
|
|
}
|
|
queue.Done = append(queue.Done, queue.Ahead...)
|
|
queue.Ahead = nil
|
|
case CmdShuffle:
|
|
shuffle()
|
|
case CmdUnshuffle:
|
|
unshuffle()
|
|
case CmdSwap:
|
|
queue.AheadUnshuffled = nil
|
|
sw := struct{ A, B int }(v)
|
|
if queue.InBounds(sw.A) && queue.InBounds(sw.B) {
|
|
replacePlaying := sw.A == 0 || sw.B == 0
|
|
if replacePlaying {
|
|
killStream()
|
|
}
|
|
*queue.At(sw.A), *queue.At(sw.B) = *queue.At(sw.B), *queue.At(sw.A)
|
|
if replacePlaying {
|
|
refreshStream(0, playbackSpeed)
|
|
}
|
|
}
|
|
case CmdDelete:
|
|
queue.AheadUnshuffled = nil
|
|
idxs := []int(v)
|
|
|
|
// Sort indices descendingly by absolute value so we don't
|
|
// mess the future indices up in the process of removal
|
|
sort.Slice(idxs, func(i, j int) bool {
|
|
abs := func(i int) int {
|
|
if i < 0 {
|
|
return -i
|
|
}
|
|
return i
|
|
}
|
|
return idxs[abs(j)] < idxs[abs(i)]
|
|
})
|
|
|
|
for _, i := range idxs {
|
|
if i < 0 {
|
|
i = len(queue.Done) + i
|
|
if i < len(queue.Done) {
|
|
queue.Done = append(queue.Done[:i], queue.Done[i+1:]...)
|
|
}
|
|
} else if i == 0 {
|
|
killStream()
|
|
queue.Playing = nil
|
|
refreshStream(0, playbackSpeed)
|
|
} else {
|
|
i -= 1
|
|
if i < len(queue.Ahead) {
|
|
queue.Ahead = append(queue.Ahead[:i], queue.Ahead[i+1:]...)
|
|
}
|
|
}
|
|
}
|
|
case CmdAddFront:
|
|
queue.Ahead = append([]extractor.Data(v), queue.Ahead...)
|
|
queue.ShuffleOffset++
|
|
case CmdAddBack:
|
|
queue.Ahead = append(queue.Ahead, []extractor.Data(v)...)
|
|
case CmdSeek:
|
|
if float64(v) > getPlaybackTime() && float64(v) < getMaxCachedPlaybackTime() {
|
|
fmt.Println("Quick seeking to", v)
|
|
// Seek to location in buffer
|
|
for getPlaybackTime() < float64(v) {
|
|
_, ok := <-audioch
|
|
if !ok {
|
|
break
|
|
}
|
|
nFrames++
|
|
}
|
|
} else {
|
|
fmt.Println("Slow seeking to", v)
|
|
// Restart stream from other location (seek using ffmpeg)
|
|
refreshStream(float64(v), playbackSpeed)
|
|
}
|
|
case CmdSpeed:
|
|
refreshStream(getPlaybackTime(), float64(v))
|
|
case CmdPlayFileAndStop:
|
|
cmd := struct {
|
|
DoneCh chan<- struct{}
|
|
Data []byte
|
|
}(v)
|
|
|
|
audioch, errch, killch = audio.StreamToDiscordOpus(ffmpegPath, "pipe:", bytes.NewReader(cmd.Data), 0, 1.0, false)
|
|
|
|
// Reset stream info
|
|
nFrames = 0
|
|
tStart = 0
|
|
playbackSpeed = 1.0
|
|
|
|
queue.Paused = false
|
|
queue.Loop = false
|
|
|
|
filePlaybackDoneCh = cmd.DoneCh
|
|
case CmdGetTime:
|
|
v <- getPlaybackTime()
|
|
case CmdGetQueue:
|
|
v <- queue.Copy()
|
|
case CmdGetSpeed:
|
|
v <- playbackSpeed
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return Client{CmdCh: cCmdCh, ErrCh: cErrCh}
|
|
}
|