diff --git a/client.go b/client.go index 1e1c06d..587de06 100644 --- a/client.go +++ b/client.go @@ -235,21 +235,29 @@ func (c *Client) String() string { // execute forever. Use Client.Quit() first if you want to disconnect the // client from the server/connection gracefully. func (c *Client) Close(sendQuit bool) { + c.RunHandlers(&Event{Command: STOPPED, Trailing: c.Server()}) if sendQuit { c.Send(&Event{Command: QUIT, Trailing: "closing"}) + + // Give ourselves a bit of padding so we can let everyone know we're + // quitting. + time.Sleep(2 * time.Second) } _ = c.conn.Close() - c.RunHandlers(&Event{Command: STOPPED, Trailing: c.Server()}) } -func (c *Client) execLoop(done chan struct{}) { +func (c *Client) execLoop(done chan struct{}, wg *sync.WaitGroup) { + c.debug.Print("starting execLoop") + defer c.debug.Print("closing execLoop") + for { select { + case <-done: + wg.Done() + return case event := <-c.rx: c.RunHandlers(event) - case <-done: - return } } } diff --git a/conn.go b/conn.go index 812396d..21c9dc6 100644 --- a/conn.go +++ b/conn.go @@ -7,7 +7,6 @@ package girc import ( "bufio" "crypto/tls" - "errors" "fmt" "net" "net/url" @@ -47,11 +46,29 @@ type ircConn struct { pingDelay time.Duration } +// ErrInvalidConfig is returned when the configuration passed to the client +// is invalid. +type ErrInvalidConfig struct { + Conf Config // Conf is the configuration that was not valid. + err error +} + +func (e ErrInvalidConfig) Error() string { return "invalid configuration: " + e.err.Error() } + +// ErrProxy is returned when an attempt to use the supplied proxy resulted +// in error, with implementation or connection. +type ErrProxy struct { + Bind string // Bind is the query string address that was supplied. + err error +} + +func (e ErrProxy) Error() string { return fmt.Sprintf("proxy error: %q: %s", e.Bind, e.err) } + // newConn sets up and returns a new connection to the server. This includes // setting up things like proxies, ssl/tls, and other misc. things. func newConn(conf Config, addr string) (*ircConn, error) { if err := conf.isValid(); err != nil { - return nil, fmt.Errorf("invalid configuration: %s", err) + return nil, ErrInvalidConfig{conf, err} } var conn net.Conn @@ -63,7 +80,7 @@ func newConn(conf Config, addr string) (*ircConn, error) { var local *net.TCPAddr local, err = net.ResolveTCPAddr("tcp", conf.Bind+":0") if err != nil { - return nil, fmt.Errorf("unable to resolve bind address %s: %s", conf.Bind, err) + return nil, err } dialer.LocalAddr = local @@ -73,24 +90,20 @@ func newConn(conf Config, addr string) (*ircConn, error) { var proxyURI *url.URL var proxyDialer proxy.Dialer - proxyURI, err = url.Parse(conf.Proxy) - if err != nil { - return nil, fmt.Errorf("unable to use proxy %q: %s", conf.Proxy, err) + if proxyURI, err = url.Parse(conf.Proxy); err != nil { + return nil, ErrProxy{conf.Proxy, err} } - proxyDialer, err = proxy.FromURL(proxyURI, dialer) - if err != nil { - return nil, fmt.Errorf("unable to use proxy %q: %s", conf.Proxy, err) + if proxyDialer, err = proxy.FromURL(proxyURI, dialer); err != nil { + return nil, ErrProxy{conf.Proxy, err} } - conn, err = proxyDialer.Dial("tcp", addr) - if err != nil { - return nil, fmt.Errorf("unable to connect to proxy %q: %s", conf.Proxy, err) + if conn, err = proxyDialer.Dial("tcp", addr); err != nil { + return nil, ErrProxy{conf.Proxy, err} } } else { - conn, err = dialer.Dial("tcp", addr) - if err != nil { - return nil, fmt.Errorf("unable to connect to %q: %s", addr, err) + if conn, err = dialer.Dial("tcp", addr); err != nil { + return nil, ErrProxy{conf.Proxy, err} } } @@ -116,15 +129,33 @@ func newConn(conf Config, addr string) (*ircConn, error) { return c, nil } +func newMockConn(conn net.Conn) *ircConn { + ctime := time.Now() + c := &ircConn{ + sock: conn, + connTime: &ctime, + connected: true, + } + c.newReadWriter() + + return c +} + +// ErrParseEvent is returned when an event cannot be parsed with ParseEvent(). +type ErrParseEvent struct { + Line string +} + +func (e ErrParseEvent) Error() string { return "unable to parse event: " + e.Line } + func (c *ircConn) decode() (event *Event, err error) { line, err := c.io.ReadString(delim) if err != nil { return nil, err } - event = ParseEvent(line) - if event == nil { - return nil, fmt.Errorf("unable to parse incoming event: %s", event) + if event = ParseEvent(line); event == nil { + return nil, ErrParseEvent{line} } return event, nil @@ -159,35 +190,97 @@ func (c *ircConn) Close() error { return c.sock.Close() } -// Connect attempts to connect to the given IRC server +// Connect attempts to connect to the given IRC server. Returns only when +// an error has occurred, or a disconnect was requested with Close(). Connect +// will only return once all goroutines have been closed to ensure there are +// no long-running routines becoming backed up. This also means that this +// will wait for all non-background handlers to complete. func (c *Client) Connect() error { + return c.internalConnect(nil) +} + +// MockConnect is used to implement mocking with an IRC server. Supply a net.Conn +// that will be used to spoof the server. A useful way to do this is to so +// net.Pipe(), pass one end into MockConnect(), and the other end into +// bufio.NewReader(). +// +// For example: +// +// client := girc.New(girc.Config{ +// Server: "dummy.int", +// Port: 6667, +// Nick: "test", +// User: "test", +// Name: "Testing123", +// }) +// +// in, out := net.Pipe() +// defer in.Close() +// defer out.Close() +// b := bufio.NewReader(in) +// +// go func() { +// if err := client.MockConnect(out); err != nil { +// panic(err) +// } +// }() +// +// defer client.Close(false) +// +// for { +// in.SetReadDeadline(time.Now().Add(300 * time.Second)) +// line, err := b.ReadString(byte('\n')) +// if err != nil { +// panic(err) +// } +// +// event := girc.ParseEvent(line) +// +// if event == nil { +// continue +// } +// +// // Do stuff with event here. +// } +func (c *Client) MockConnect(conn net.Conn) error { + return c.internalConnect(conn) +} + +func (c *Client) internalConnect(mock net.Conn) error { // We want to be the only one handling connects/disconnects right now. c.cmux.Lock() // Reset the state. c.state = newState() - // Validate info, and actually make the connection. - c.debug.Printf("connecting to %s...", c.Server()) - conn, err := newConn(c.Config, c.Server()) - if err != nil { - c.cmux.Unlock() - return err + if mock == nil { + // Validate info, and actually make the connection. + c.debug.Printf("connecting to %s...", c.Server()) + conn, err := newConn(c.Config, c.Server()) + if err != nil { + c.cmux.Unlock() + return err + } + + c.conn = conn + } else { + c.conn = newMockConn(mock) } - c.conn = conn c.cmux.Unlock() // Start read loop to process messages from the server. - errs := make(chan error, 4) + errs := make(chan error, 3) done := make(chan struct{}, 4) - defer close(errs) - defer close(done) + var wg sync.WaitGroup + // 4 being the number of goroutines we need to finish when this function + // returns. + wg.Add(4) - go c.execLoop(done) - go c.readLoop(errs, done) - go c.pingLoop(errs, done) - go c.sendLoop(errs, done) + go c.execLoop(done, &wg) + go c.readLoop(errs, done, &wg) + go c.pingLoop(errs, done, &wg) + go c.sendLoop(errs, done, &wg) // Send a virtual event allowing hooks for successful socket connection. c.RunHandlers(&Event{Command: INITIALIZED, Trailing: c.Server()}) @@ -211,27 +304,50 @@ func (c *Client) Connect() error { // support. c.listCAP() - return <-errs + // Wait for the first error. + err := <-errs + + c.conn.mu.Lock() + c.conn.connected = false + c.conn.mu.Unlock() + + // Once we have our error, let all other functions know we're done. + c.debug.Print("waiting for all routines to finish") + close(done) + // Wait for all goroutines to finish. + wg.Wait() + + close(errs) + + // Make sure that the connection is closed if not already. + c.cmux.Lock() + _ = c.conn.Close() + c.cmux.Unlock() + + return err } // readLoop sets a timeout of 300 seconds, and then attempts to read from the // IRC server. If there is an error, it calls Reconnect. -func (c *Client) readLoop(errs chan error, done chan struct{}) { +func (c *Client) readLoop(errs chan error, done chan struct{}, wg *sync.WaitGroup) { + c.debug.Print("starting readLoop") + defer c.debug.Print("closing readLoop") + var event *Event var err error for { select { case <-done: + wg.Done() return default: // c.conn.sock.SetDeadline(time.Now().Add(300 * time.Second)) event, err = c.conn.decode() if err != nil { - // Attempt a reconnect (if applicable). If it fails, send - // the error to c.Config.HandleError to be dealt with, if - // the handler exists. errs <- err + wg.Done() + return } c.rx <- event @@ -275,13 +391,14 @@ func (c *ircConn) rate(chars int) time.Duration { return 0 } -func (c *Client) sendLoop(errs chan error, done chan struct{}) { +func (c *Client) sendLoop(errs chan error, done chan struct{}, wg *sync.WaitGroup) { + c.debug.Print("starting sendLoop") + defer c.debug.Print("closing sendLoop") + var err error for { select { - case <-done: - return case event := <-c.tx: // Log the event. if !event.Sensitive { @@ -310,8 +427,12 @@ func (c *Client) sendLoop(errs chan error, done chan struct{}) { if err != nil { errs <- err + wg.Done() return } + case <-done: + wg.Done() + return } } } @@ -327,11 +448,25 @@ func (c *Client) flushTx() { } } -// ErrTimedOut is returned when we attempt to ping the server, and time out +// ErrTimedOut is returned when we attempt to ping the server, and timed out // before receiving a PONG back. -var ErrTimedOut = errors.New("timed out during ping to server") +type ErrTimedOut struct { + // TimeSinceSuccess is how long ago we received a successful pong. + TimeSinceSuccess time.Duration + // LastPong is the time we received our last successful pong. + LastPong time.Time + // LastPong is the last time we sent a pong request. + LastPing time.Time + // Delay is the configured delay between how often we send a ping request. + Delay time.Duration +} + +func (ErrTimedOut) Error() string { return "timed out during ping to server" } + +func (c *Client) pingLoop(errs chan error, done chan struct{}, wg *sync.WaitGroup) { + c.debug.Print("starting pingLoop") + defer c.debug.Print("closing pingLoop") -func (c *Client) pingLoop(errs chan error, done chan struct{}) { c.conn.mu.Lock() c.conn.lastPing = time.Now() c.conn.lastPong = time.Now() @@ -339,29 +474,39 @@ func (c *Client) pingLoop(errs chan error, done chan struct{}) { // Delay for 30 seconds during connect to wait for the client to register // and what not. - time.Sleep(20 * time.Second) + time.Sleep(10 * time.Second) tick := time.NewTicker(c.Config.PingDelay) defer tick.Stop() for { select { - case <-done: - return case <-tick.C: c.conn.mu.RLock() - defer c.conn.mu.RUnlock() if time.Since(c.conn.lastPong) > c.Config.PingDelay+(60*time.Second) { // It's 60 seconds over what out ping delay is, connection // has probably dropped. - errs <- ErrTimedOut + errs <- ErrTimedOut{ + TimeSinceSuccess: time.Since(c.conn.lastPong), + LastPong: c.conn.lastPong, + LastPing: c.conn.lastPing, + Delay: c.Config.PingDelay, + } + + wg.Done() + c.conn.mu.RUnlock() return } + c.conn.mu.RUnlock() c.conn.mu.Lock() c.conn.lastPing = time.Now() c.conn.mu.Unlock() + c.Commands.Ping(fmt.Sprintf("%d", time.Now().UnixNano())) + case <-done: + wg.Done() + return } } }