mpvfs

9P file server for controlling mpv playback

git clone https://git.8pit.net/mpvfs.git

  1package mpv
  2
  3import (
  4	"bufio"
  5	"encoding/json"
  6	"errors"
  7	"log"
  8	"math"
  9	"net"
 10	"sync"
 11	"sync/atomic"
 12)
 13
 14type Client struct {
 15	msgID int32
 16
 17	respMtx *sync.Mutex
 18	respMap map[int32]chan response
 19
 20	propMtx   *sync.Mutex
 21	propID    int32
 22	propChans map[int32]chan interface{}
 23
 24	conn    net.Conn
 25	reqChan chan request
 26	msgChan chan response
 27
 28	verbose bool
 29	ErrChan <-chan error
 30}
 31
 32func NewClient(path string, verbose bool) (*Client, error) {
 33	conn, err := net.Dial("unix", path)
 34	if err != nil {
 35		return nil, err
 36	}
 37
 38	c := &Client{
 39		msgID:     math.MinInt32,
 40		propID:    math.MinInt32,
 41		respMtx:   new(sync.Mutex),
 42		respMap:   make(map[int32]chan response),
 43		propMtx:   new(sync.Mutex),
 44		propChans: make(map[int32]chan interface{}),
 45		conn:      conn,
 46		reqChan:   make(chan request),
 47		msgChan:   make(chan response),
 48		verbose:   verbose,
 49	}
 50
 51	errChan := make(chan error, 5)
 52	c.ErrChan = errChan
 53
 54	go c.sendLoop(c.reqChan, errChan)
 55	go c.recvLoop(c.msgChan, errChan)
 56
 57	go c.dispatchLoop(c.msgChan)
 58	return c, nil
 59}
 60
 61func (c *Client) sendLoop(ch <-chan request, errCh chan<- error) {
 62	for req := range ch {
 63		err := req.Encode(c.conn)
 64		if err != nil {
 65			errCh <- err
 66			continue
 67		}
 68
 69		// Every message must be terminated with \n.
 70		_, err = c.conn.Write([]byte("\n"))
 71		if err != nil {
 72			errCh <- err
 73			continue
 74		}
 75	}
 76}
 77
 78func (c *Client) recvLoop(ch chan<- response, errCh chan<- error) {
 79	scanner := bufio.NewScanner(c.conn)
 80	for scanner.Scan() {
 81		data := scanner.Bytes()
 82		c.debug("<-", string(data))
 83
 84		var msg response
 85		err := json.Unmarshal(data, &msg)
 86		if err != nil {
 87			errCh <- err
 88			continue
 89		}
 90
 91		ch <- msg
 92	}
 93
 94	err := scanner.Err()
 95	if err != nil {
 96		errCh <- err
 97	}
 98}
 99
100func (c *Client) dispatchLoop(ch <-chan response) {
101	for {
102		msg := <-ch
103		if msg.Event == "property-change" {
104			go c.handleChange(msg)
105		} else if msg.Event == "" {
106			go c.handleResp(msg)
107		}
108	}
109}
110
111func (c *Client) handleResp(msg response) {
112	c.respMtx.Lock()
113	ch, ok := c.respMap[msg.ReqID]
114	c.respMtx.Unlock()
115
116	if ok {
117		ch <- msg
118	}
119}
120
121func (c *Client) handleChange(msg response) {
122	c.propMtx.Lock()
123	ch, ok := c.propChans[msg.ID]
124	c.propMtx.Unlock()
125
126	if ok {
127		ch <- msg.Data
128	}
129}
130
131func (c *Client) debug(args ...interface{}) {
132	const prefix = "[mpv client]"
133	if c.verbose {
134		argv := append([]interface{}{prefix}, args...)
135		log.Println(argv...)
136	}
137}
138
139func (c *Client) newReq(name interface{}, args ...interface{}) request {
140	argv := append([]interface{}{name}, args...)
141
142	// Signed integer overflow is well-defined in go.
143	id := atomic.AddInt32(&c.msgID, 1)
144
145	return request{Cmd: argv, ID: id}
146}
147
148func (c *Client) ExecCmd(name string, args ...interface{}) (interface{}, error) {
149	req := c.newReq(name, args...)
150
151	ch := make(chan response)
152	defer close(ch)
153
154	c.respMtx.Lock()
155	c.respMap[req.ID] = ch
156	c.respMtx.Unlock()
157
158	// Delete request from respMap on return
159	defer func() {
160		c.respMtx.Lock()
161		delete(c.respMap, req.ID)
162		c.respMtx.Unlock()
163	}()
164
165	c.debug("->", req.String())
166	c.reqChan <- req
167
168	resp := <-ch
169	if resp.Error != noError {
170		return nil, errors.New(resp.Error)
171	}
172
173	return resp.Data, nil
174}
175
176func (c *Client) SetProperty(name string, value interface{}) error {
177	_, err := c.ExecCmd("set_property", name, value)
178	return err
179}
180
181func (c *Client) GetProperty(name string) (interface{}, error) {
182	value, err := c.ExecCmd("get_property", name)
183	if err != nil {
184		return nil, err
185	}
186
187	return value, nil
188}
189
190func (c *Client) ObserveProperty(name string) (<-chan interface{}, error) {
191	ch := make(chan interface{})
192
193	c.propMtx.Lock()
194	id := c.propID
195	c.propChans[id] = ch
196
197	// TODO: Don't reuse existing property IDs on overflow.
198	c.propID++
199	c.propMtx.Unlock()
200
201	_, err := c.ExecCmd("observe_property", id, name)
202	if err != nil {
203		c.propMtx.Lock()
204		delete(c.propChans, id)
205		c.propMtx.Unlock()
206		return nil, err
207	}
208
209	return ch, nil
210}