refractor quite a bit of connect logic, and errors

This commit is contained in:
Liam Stanley 2017-04-18 11:06:19 -04:00
parent 9cb3f3c522
commit 6bbd53edd8
2 changed files with 206 additions and 53 deletions

View File

@ -235,21 +235,29 @@ func (c *Client) String() string {
// execute forever. Use Client.Quit() first if you want to disconnect the
// client from the server/connection gracefully.
func (c *Client) Close(sendQuit bool) {
c.RunHandlers(&Event{Command: STOPPED, Trailing: c.Server()})
if sendQuit {
c.Send(&Event{Command: QUIT, Trailing: "closing"})
// Give ourselves a bit of padding so we can let everyone know we're
// quitting.
time.Sleep(2 * time.Second)
}
_ = c.conn.Close()
c.RunHandlers(&Event{Command: STOPPED, Trailing: c.Server()})
}
func (c *Client) execLoop(done chan struct{}) {
func (c *Client) execLoop(done chan struct{}, wg *sync.WaitGroup) {
c.debug.Print("starting execLoop")
defer c.debug.Print("closing execLoop")
for {
select {
case <-done:
wg.Done()
return
case event := <-c.rx:
c.RunHandlers(event)
case <-done:
return
}
}
}

243
conn.go
View File

@ -7,7 +7,6 @@ package girc
import (
"bufio"
"crypto/tls"
"errors"
"fmt"
"net"
"net/url"
@ -47,11 +46,29 @@ type ircConn struct {
pingDelay time.Duration
}
// ErrInvalidConfig is returned when the configuration passed to the client
// is invalid.
type ErrInvalidConfig struct {
Conf Config // Conf is the configuration that was not valid.
err error
}
func (e ErrInvalidConfig) Error() string { return "invalid configuration: " + e.err.Error() }
// ErrProxy is returned when an attempt to use the supplied proxy resulted
// in error, with implementation or connection.
type ErrProxy struct {
Bind string // Bind is the query string address that was supplied.
err error
}
func (e ErrProxy) Error() string { return fmt.Sprintf("proxy error: %q: %s", e.Bind, e.err) }
// newConn sets up and returns a new connection to the server. This includes
// setting up things like proxies, ssl/tls, and other misc. things.
func newConn(conf Config, addr string) (*ircConn, error) {
if err := conf.isValid(); err != nil {
return nil, fmt.Errorf("invalid configuration: %s", err)
return nil, ErrInvalidConfig{conf, err}
}
var conn net.Conn
@ -63,7 +80,7 @@ func newConn(conf Config, addr string) (*ircConn, error) {
var local *net.TCPAddr
local, err = net.ResolveTCPAddr("tcp", conf.Bind+":0")
if err != nil {
return nil, fmt.Errorf("unable to resolve bind address %s: %s", conf.Bind, err)
return nil, err
}
dialer.LocalAddr = local
@ -73,24 +90,20 @@ func newConn(conf Config, addr string) (*ircConn, error) {
var proxyURI *url.URL
var proxyDialer proxy.Dialer
proxyURI, err = url.Parse(conf.Proxy)
if err != nil {
return nil, fmt.Errorf("unable to use proxy %q: %s", conf.Proxy, err)
if proxyURI, err = url.Parse(conf.Proxy); err != nil {
return nil, ErrProxy{conf.Proxy, err}
}
proxyDialer, err = proxy.FromURL(proxyURI, dialer)
if err != nil {
return nil, fmt.Errorf("unable to use proxy %q: %s", conf.Proxy, err)
if proxyDialer, err = proxy.FromURL(proxyURI, dialer); err != nil {
return nil, ErrProxy{conf.Proxy, err}
}
conn, err = proxyDialer.Dial("tcp", addr)
if err != nil {
return nil, fmt.Errorf("unable to connect to proxy %q: %s", conf.Proxy, err)
if conn, err = proxyDialer.Dial("tcp", addr); err != nil {
return nil, ErrProxy{conf.Proxy, err}
}
} else {
conn, err = dialer.Dial("tcp", addr)
if err != nil {
return nil, fmt.Errorf("unable to connect to %q: %s", addr, err)
if conn, err = dialer.Dial("tcp", addr); err != nil {
return nil, ErrProxy{conf.Proxy, err}
}
}
@ -116,15 +129,33 @@ func newConn(conf Config, addr string) (*ircConn, error) {
return c, nil
}
func newMockConn(conn net.Conn) *ircConn {
ctime := time.Now()
c := &ircConn{
sock: conn,
connTime: &ctime,
connected: true,
}
c.newReadWriter()
return c
}
// ErrParseEvent is returned when an event cannot be parsed with ParseEvent().
type ErrParseEvent struct {
Line string
}
func (e ErrParseEvent) Error() string { return "unable to parse event: " + e.Line }
func (c *ircConn) decode() (event *Event, err error) {
line, err := c.io.ReadString(delim)
if err != nil {
return nil, err
}
event = ParseEvent(line)
if event == nil {
return nil, fmt.Errorf("unable to parse incoming event: %s", event)
if event = ParseEvent(line); event == nil {
return nil, ErrParseEvent{line}
}
return event, nil
@ -159,35 +190,97 @@ func (c *ircConn) Close() error {
return c.sock.Close()
}
// Connect attempts to connect to the given IRC server
// Connect attempts to connect to the given IRC server. Returns only when
// an error has occurred, or a disconnect was requested with Close(). Connect
// will only return once all goroutines have been closed to ensure there are
// no long-running routines becoming backed up. This also means that this
// will wait for all non-background handlers to complete.
func (c *Client) Connect() error {
return c.internalConnect(nil)
}
// MockConnect is used to implement mocking with an IRC server. Supply a net.Conn
// that will be used to spoof the server. A useful way to do this is to so
// net.Pipe(), pass one end into MockConnect(), and the other end into
// bufio.NewReader().
//
// For example:
//
// client := girc.New(girc.Config{
// Server: "dummy.int",
// Port: 6667,
// Nick: "test",
// User: "test",
// Name: "Testing123",
// })
//
// in, out := net.Pipe()
// defer in.Close()
// defer out.Close()
// b := bufio.NewReader(in)
//
// go func() {
// if err := client.MockConnect(out); err != nil {
// panic(err)
// }
// }()
//
// defer client.Close(false)
//
// for {
// in.SetReadDeadline(time.Now().Add(300 * time.Second))
// line, err := b.ReadString(byte('\n'))
// if err != nil {
// panic(err)
// }
//
// event := girc.ParseEvent(line)
//
// if event == nil {
// continue
// }
//
// // Do stuff with event here.
// }
func (c *Client) MockConnect(conn net.Conn) error {
return c.internalConnect(conn)
}
func (c *Client) internalConnect(mock net.Conn) error {
// We want to be the only one handling connects/disconnects right now.
c.cmux.Lock()
// Reset the state.
c.state = newState()
// Validate info, and actually make the connection.
c.debug.Printf("connecting to %s...", c.Server())
conn, err := newConn(c.Config, c.Server())
if err != nil {
c.cmux.Unlock()
return err
if mock == nil {
// Validate info, and actually make the connection.
c.debug.Printf("connecting to %s...", c.Server())
conn, err := newConn(c.Config, c.Server())
if err != nil {
c.cmux.Unlock()
return err
}
c.conn = conn
} else {
c.conn = newMockConn(mock)
}
c.conn = conn
c.cmux.Unlock()
// Start read loop to process messages from the server.
errs := make(chan error, 4)
errs := make(chan error, 3)
done := make(chan struct{}, 4)
defer close(errs)
defer close(done)
var wg sync.WaitGroup
// 4 being the number of goroutines we need to finish when this function
// returns.
wg.Add(4)
go c.execLoop(done)
go c.readLoop(errs, done)
go c.pingLoop(errs, done)
go c.sendLoop(errs, done)
go c.execLoop(done, &wg)
go c.readLoop(errs, done, &wg)
go c.pingLoop(errs, done, &wg)
go c.sendLoop(errs, done, &wg)
// Send a virtual event allowing hooks for successful socket connection.
c.RunHandlers(&Event{Command: INITIALIZED, Trailing: c.Server()})
@ -211,27 +304,50 @@ func (c *Client) Connect() error {
// support.
c.listCAP()
return <-errs
// Wait for the first error.
err := <-errs
c.conn.mu.Lock()
c.conn.connected = false
c.conn.mu.Unlock()
// Once we have our error, let all other functions know we're done.
c.debug.Print("waiting for all routines to finish")
close(done)
// Wait for all goroutines to finish.
wg.Wait()
close(errs)
// Make sure that the connection is closed if not already.
c.cmux.Lock()
_ = c.conn.Close()
c.cmux.Unlock()
return err
}
// readLoop sets a timeout of 300 seconds, and then attempts to read from the
// IRC server. If there is an error, it calls Reconnect.
func (c *Client) readLoop(errs chan error, done chan struct{}) {
func (c *Client) readLoop(errs chan error, done chan struct{}, wg *sync.WaitGroup) {
c.debug.Print("starting readLoop")
defer c.debug.Print("closing readLoop")
var event *Event
var err error
for {
select {
case <-done:
wg.Done()
return
default:
// c.conn.sock.SetDeadline(time.Now().Add(300 * time.Second))
event, err = c.conn.decode()
if err != nil {
// Attempt a reconnect (if applicable). If it fails, send
// the error to c.Config.HandleError to be dealt with, if
// the handler exists.
errs <- err
wg.Done()
return
}
c.rx <- event
@ -275,13 +391,14 @@ func (c *ircConn) rate(chars int) time.Duration {
return 0
}
func (c *Client) sendLoop(errs chan error, done chan struct{}) {
func (c *Client) sendLoop(errs chan error, done chan struct{}, wg *sync.WaitGroup) {
c.debug.Print("starting sendLoop")
defer c.debug.Print("closing sendLoop")
var err error
for {
select {
case <-done:
return
case event := <-c.tx:
// Log the event.
if !event.Sensitive {
@ -310,8 +427,12 @@ func (c *Client) sendLoop(errs chan error, done chan struct{}) {
if err != nil {
errs <- err
wg.Done()
return
}
case <-done:
wg.Done()
return
}
}
}
@ -327,11 +448,25 @@ func (c *Client) flushTx() {
}
}
// ErrTimedOut is returned when we attempt to ping the server, and time out
// ErrTimedOut is returned when we attempt to ping the server, and timed out
// before receiving a PONG back.
var ErrTimedOut = errors.New("timed out during ping to server")
type ErrTimedOut struct {
// TimeSinceSuccess is how long ago we received a successful pong.
TimeSinceSuccess time.Duration
// LastPong is the time we received our last successful pong.
LastPong time.Time
// LastPong is the last time we sent a pong request.
LastPing time.Time
// Delay is the configured delay between how often we send a ping request.
Delay time.Duration
}
func (ErrTimedOut) Error() string { return "timed out during ping to server" }
func (c *Client) pingLoop(errs chan error, done chan struct{}, wg *sync.WaitGroup) {
c.debug.Print("starting pingLoop")
defer c.debug.Print("closing pingLoop")
func (c *Client) pingLoop(errs chan error, done chan struct{}) {
c.conn.mu.Lock()
c.conn.lastPing = time.Now()
c.conn.lastPong = time.Now()
@ -339,29 +474,39 @@ func (c *Client) pingLoop(errs chan error, done chan struct{}) {
// Delay for 30 seconds during connect to wait for the client to register
// and what not.
time.Sleep(20 * time.Second)
time.Sleep(10 * time.Second)
tick := time.NewTicker(c.Config.PingDelay)
defer tick.Stop()
for {
select {
case <-done:
return
case <-tick.C:
c.conn.mu.RLock()
defer c.conn.mu.RUnlock()
if time.Since(c.conn.lastPong) > c.Config.PingDelay+(60*time.Second) {
// It's 60 seconds over what out ping delay is, connection
// has probably dropped.
errs <- ErrTimedOut
errs <- ErrTimedOut{
TimeSinceSuccess: time.Since(c.conn.lastPong),
LastPong: c.conn.lastPong,
LastPing: c.conn.lastPing,
Delay: c.Config.PingDelay,
}
wg.Done()
c.conn.mu.RUnlock()
return
}
c.conn.mu.RUnlock()
c.conn.mu.Lock()
c.conn.lastPing = time.Now()
c.conn.mu.Unlock()
c.Commands.Ping(fmt.Sprintf("%d", time.Now().UnixNano()))
case <-done:
wg.Done()
return
}
}
}