1package mpv23import (4 "bufio"5 "encoding/json"6 "errors"7 "log"8 "math"9 "net"10 "sync"11 "sync/atomic"12)1314type Client struct {15 msgID int321617 respMtx *sync.Mutex18 respMap map[int32]chan response1920 propMtx *sync.Mutex21 propID int3222 propChans map[int32]chan interface{}2324 conn net.Conn25 reqChan chan request26 msgChan chan response2728 verbose bool29 ErrChan <-chan error30}3132func NewClient(path string, verbose bool) (*Client, error) {33 conn, err := net.Dial("unix", path)34 if err != nil {35 return nil, err36 }3738 c := &Client{39 msgID: math.MinInt32,40 propID: math.MinInt32,41 respMtx: new(sync.Mutex),42 respMap: make(map[int32]chan response),43 propMtx: new(sync.Mutex),44 propChans: make(map[int32]chan interface{}),45 conn: conn,46 reqChan: make(chan request),47 msgChan: make(chan response),48 verbose: verbose,49 }5051 errChan := make(chan error, 5)52 c.ErrChan = errChan5354 go c.sendLoop(c.reqChan, errChan)55 go c.recvLoop(c.msgChan, errChan)5657 go c.dispatchLoop(c.msgChan)58 return c, nil59}6061func (c *Client) sendLoop(ch <-chan request, errCh chan<- error) {62 for req := range ch {63 err := req.Encode(c.conn)64 if err != nil {65 errCh <- err66 continue67 }6869 // Every message must be terminated with \n.70 _, err = c.conn.Write([]byte("\n"))71 if err != nil {72 errCh <- err73 continue74 }75 }76}7778func (c *Client) recvLoop(ch chan<- response, errCh chan<- error) {79 scanner := bufio.NewScanner(c.conn)80 for scanner.Scan() {81 data := scanner.Bytes()82 c.debug("<-", string(data))8384 var msg response85 err := json.Unmarshal(data, &msg)86 if err != nil {87 errCh <- err88 continue89 }9091 ch <- msg92 }9394 err := scanner.Err()95 if err != nil {96 errCh <- err97 }98}99100func (c *Client) dispatchLoop(ch <-chan response) {101 for {102 msg := <-ch103 if msg.Event == "property-change" {104 go c.handleChange(msg)105 } else if msg.Event == "" {106 go c.handleResp(msg)107 }108 }109}110111func (c *Client) handleResp(msg response) {112 c.respMtx.Lock()113 ch, ok := c.respMap[msg.ReqID]114 c.respMtx.Unlock()115116 if ok {117 ch <- msg118 }119}120121func (c *Client) handleChange(msg response) {122 c.propMtx.Lock()123 ch, ok := c.propChans[msg.ID]124 c.propMtx.Unlock()125126 if ok {127 ch <- msg.Data128 }129}130131func (c *Client) debug(args ...interface{}) {132 const prefix = "[mpv client]"133 if c.verbose {134 argv := append([]interface{}{prefix}, args...)135 log.Println(argv...)136 }137}138139func (c *Client) newReq(name interface{}, args ...interface{}) request {140 argv := append([]interface{}{name}, args...)141142 // Signed integer overflow is well-defined in go.143 id := atomic.AddInt32(&c.msgID, 1)144145 return request{Cmd: argv, ID: id}146}147148func (c *Client) ExecCmd(name string, args ...interface{}) (interface{}, error) {149 req := c.newReq(name, args...)150151 ch := make(chan response)152 defer close(ch)153154 c.respMtx.Lock()155 c.respMap[req.ID] = ch156 c.respMtx.Unlock()157158 // Delete request from respMap on return159 defer func() {160 c.respMtx.Lock()161 delete(c.respMap, req.ID)162 c.respMtx.Unlock()163 }()164165 c.debug("->", req.String())166 c.reqChan <- req167168 resp := <-ch169 if resp.Error != noError {170 return nil, errors.New(resp.Error)171 }172173 return resp.Data, nil174}175176func (c *Client) SetProperty(name string, value interface{}) error {177 _, err := c.ExecCmd("set_property", name, value)178 return err179}180181func (c *Client) GetProperty(name string) (interface{}, error) {182 value, err := c.ExecCmd("get_property", name)183 if err != nil {184 return nil, err185 }186187 return value, nil188}189190func (c *Client) ObserveProperty(name string) (<-chan interface{}, error) {191 ch := make(chan interface{})192193 c.propMtx.Lock()194 id := c.propID195 c.propChans[id] = ch196197 // TODO: Don't reuse existing property IDs on overflow.198 c.propID++199 c.propMtx.Unlock()200201 _, err := c.ExecCmd("observe_property", id, name)202 if err != nil {203 c.propMtx.Lock()204 delete(c.propChans, id)205 c.propMtx.Unlock()206 return nil, err207 }208209 return ch, nil210}