Merge pull request #38 from jlatt/performance

improve performance by using fewer goroutines per client
This commit is contained in:
Jeremy Latt 2014-04-15 21:25:12 -07:00
commit 79dc46dac8
6 changed files with 169 additions and 112 deletions

@ -7,9 +7,8 @@ import (
)
const (
LOGIN_TIMEOUT = time.Minute / 2 // how long the client has to login
IDLE_TIMEOUT = time.Minute // how long before a client is considered idle
QUIT_TIMEOUT = time.Minute // how long after idle before a client is kicked
IDLE_TIMEOUT = time.Minute // how long before a client is considered idle
QUIT_TIMEOUT = time.Minute // how long after idle before a client is kicked
)
type Client struct {
@ -19,14 +18,12 @@ type Client struct {
capabilities CapabilitySet
capState CapState
channels ChannelSet
commands chan Command
ctime time.Time
flags map[UserMode]bool
hasQuit bool
hops uint
hostname Name
idleTimer *time.Timer
loginTimer *time.Timer
nick Name
quitTimer *time.Timer
realname Text
@ -44,13 +41,12 @@ func NewClient(server *Server, conn net.Conn) *Client {
capState: CapNone,
capabilities: make(CapabilitySet),
channels: make(ChannelSet),
commands: make(chan Command),
ctime: now,
flags: make(map[UserMode]bool),
server: server,
socket: NewSocket(conn),
}
client.socket = NewSocket(conn, client.commands)
client.loginTimer = time.AfterFunc(LOGIN_TIMEOUT, client.connectionTimeout)
client.Touch()
go client.run()
return client
@ -61,8 +57,31 @@ func NewClient(server *Server, conn net.Conn) *Client {
//
func (client *Client) run() {
for command := range client.commands {
if checkPass, ok := command.(checkPasswordCommand); ok {
var command Command
var err error
var line string
// Set the hostname for this client. The client may later send a PROXY
// command from stunnel that sets the hostname to something more accurate.
client.send(NewProxyCommand(AddrLookupHostname(
client.socket.conn.RemoteAddr())))
for err == nil {
if line, err = client.socket.Read(); err != nil {
command = NewQuitCommand("connection closed")
} else if command, err = ParseCommand(line); err != nil {
switch err {
case ErrParseCommand:
client.Reply(RplNotice(client.server, client,
NewText("failed to parse command")))
case NotEnoughArgsError:
// TODO
}
continue
} else if checkPass, ok := command.(checkPasswordCommand); ok {
checkPass.LoadPassword(client.server)
// Block the client thread while handling a potentially expensive
// password bcrypt operation. Since the server is single-threaded
@ -71,13 +90,20 @@ func (client *Client) run() {
// completes. This could be a form of DoS if handled naively.
checkPass.CheckPassword()
}
command.SetClient(client)
client.server.commands <- command
client.send(command)
}
}
func (client *Client) send(command Command) {
command.SetClient(client)
client.server.commands <- command
}
// quit timer goroutine
func (client *Client) connectionTimeout() {
client.commands <- NewQuitCommand("connection timeout")
client.send(NewQuitCommand("connection timeout"))
}
//
@ -109,7 +135,7 @@ func (client *Client) Touch() {
}
func (client *Client) Idle() {
client.Reply(RplPing(client))
client.Reply(RplPing(client.server))
if client.quitTimer == nil {
client.quitTimer = time.AfterFunc(QUIT_TIMEOUT, client.connectionTimeout)
@ -123,7 +149,6 @@ func (client *Client) Register() {
return
}
client.registered = true
client.loginTimer.Stop()
client.Touch()
}
@ -140,9 +165,6 @@ func (client *Client) destroy() {
// clean up self
if client.loginTimer != nil {
client.loginTimer.Stop()
}
if client.idleTimer != nil {
client.idleTimer.Stop()
}
@ -242,11 +264,8 @@ func (client *Client) ChangeNickname(nickname Name) {
}
}
func (client *Client) Reply(reply string, args ...interface{}) {
if len(args) > 0 {
reply = fmt.Sprintf(reply, args...)
}
client.socket.Write(reply)
func (client *Client) Reply(reply string) error {
return client.socket.Write(reply)
}
func (client *Client) Quit(message Text) {

@ -702,7 +702,6 @@ func ParseProxyCommand(args []string) (Command, error) {
type AwayCommand struct {
BaseCommand
text Text
away bool
}
func ParseAwayCommand(args []string) (Command, error) {
@ -710,7 +709,6 @@ func ParseAwayCommand(args []string) (Command, error) {
if len(args) > 0 {
cmd.text = NewText(args[0])
cmd.away = true
}
return cmd, nil

68
irc/debug.go Normal file

@ -0,0 +1,68 @@
package irc
import (
"os"
"runtime"
"runtime/debug"
"runtime/pprof"
"time"
)
func (msg *DebugCommand) HandleServer(server *Server) {
client := msg.Client()
if !client.flags[Operator] {
return
}
switch msg.subCommand {
case "GCSTATS":
stats := debug.GCStats{
Pause: make([]time.Duration, 10),
PauseQuantiles: make([]time.Duration, 5),
}
debug.ReadGCStats(&stats)
server.Replyf(client, "last GC: %s", stats.LastGC.Format(time.RFC1123))
server.Replyf(client, "num GC: %d", stats.NumGC)
server.Replyf(client, "pause total: %s", stats.PauseTotal)
server.Replyf(client, "pause quantiles min%%: %s", stats.PauseQuantiles[0])
server.Replyf(client, "pause quantiles 25%%: %s", stats.PauseQuantiles[1])
server.Replyf(client, "pause quantiles 50%%: %s", stats.PauseQuantiles[2])
server.Replyf(client, "pause quantiles 75%%: %s", stats.PauseQuantiles[3])
server.Replyf(client, "pause quantiles max%%: %s", stats.PauseQuantiles[4])
case "NUMGOROUTINE":
count := runtime.NumGoroutine()
server.Replyf(client, "num goroutines: %d", count)
case "PROFILEHEAP":
profFile := "ergonomadic.mprof"
file, err := os.Create(profFile)
if err != nil {
server.Replyf(client, "error: %s", err)
break
}
defer file.Close()
pprof.Lookup("heap").WriteTo(file, 0)
server.Replyf(client, "written to %s", profFile)
case "STARTCPUPROFILE":
profFile := "ergonomadic.prof"
file, err := os.Create(profFile)
if err != nil {
server.Replyf(client, "error: %s", err)
break
}
if err := pprof.StartCPUProfile(file); err != nil {
defer file.Close()
server.Replyf(client, "error: %s", err)
break
}
server.Replyf(client, "CPU profile writing to %s", profFile)
case "STOPCPUPROFILE":
pprof.StopCPUProfile()
server.Reply(client, "CPU profiling stopped")
}
}

@ -151,8 +151,8 @@ func RplPing(target Identifiable) string {
return NewStringReply(nil, PING, ":%s", target.Nick())
}
func RplPong(client *Client) string {
return NewStringReply(nil, PONG, client.Nick().String())
func RplPong(client *Client, msg Text) string {
return NewStringReply(nil, PONG, "%s :%s", client.server, msg.String())
}
func RplQuit(client *Client, message Text) string {

@ -8,9 +8,6 @@ import (
"net"
"os"
"os/signal"
"runtime"
"runtime/debug"
"runtime/pprof"
"strings"
"syscall"
"time"
@ -121,7 +118,6 @@ func (server *Server) loadChannels() {
func (server *Server) processCommand(cmd Command) {
client := cmd.Client()
Log.debug.Printf("%s → %+v", client, cmd)
if !client.registered {
regCmd, ok := cmd.(RegServerCommand)
@ -138,6 +134,7 @@ func (server *Server) processCommand(cmd Command) {
client.ErrUnknownCommand(cmd.Code())
return
}
switch srvCmd.(type) {
case *PingCommand, *PongCommand:
client.Touch()
@ -149,6 +146,7 @@ func (server *Server) processCommand(cmd Command) {
client.Active()
client.Touch()
}
srvCmd.HandleServer(server)
}
@ -272,6 +270,14 @@ func (s *Server) Nick() Name {
return s.Id()
}
func (server *Server) Reply(target *Client, message string) {
target.Reply(RplPrivMsg(server, target, NewText(message)))
}
func (server *Server) Replyf(target *Client, format string, args ...interface{}) {
server.Reply(target, fmt.Sprintf(format, args...))
}
//
// registration commands
//
@ -344,7 +350,8 @@ func (m *PassCommand) HandleServer(s *Server) {
}
func (m *PingCommand) HandleServer(s *Server) {
m.Client().Reply(RplPong(m.Client()))
client := m.Client()
client.Reply(RplPong(client, m.server.Text()))
}
func (m *PongCommand) HandleServer(s *Server) {
@ -514,23 +521,33 @@ func (msg *OperCommand) HandleServer(server *Server) {
client.flags[Operator] = true
client.RplYoureOper()
client.RplUModeIs(client)
client.Reply(RplModeChanges(client, client, ModeChanges{&ModeChange{
mode: Operator,
op: Add,
}}))
}
func (msg *AwayCommand) HandleServer(server *Server) {
client := msg.Client()
if msg.away {
if len(msg.text) > 0 {
client.flags[Away] = true
} else {
delete(client.flags, Away)
}
client.awayMessage = msg.text
var op ModeOp
if client.flags[Away] {
op = Add
client.RplNowAway()
} else {
op = Remove
client.RplUnAway()
}
client.Reply(RplModeChanges(client, client, ModeChanges{&ModeChange{
mode: Away,
op: op,
}}))
}
func (msg *IsOnCommand) HandleServer(server *Server) {
@ -638,53 +655,6 @@ func (msg *NamesCommand) HandleServer(server *Server) {
}
}
func (server *Server) Reply(target *Client, format string, args ...interface{}) {
target.Reply(RplPrivMsg(server, target, NewText(fmt.Sprintf(format, args...))))
}
func (msg *DebugCommand) HandleServer(server *Server) {
client := msg.Client()
if !client.flags[Operator] {
return
}
switch msg.subCommand {
case "GC":
runtime.GC()
server.Reply(client, "OK")
case "GCSTATS":
stats := debug.GCStats{
Pause: make([]time.Duration, 10),
PauseQuantiles: make([]time.Duration, 5),
}
debug.ReadGCStats(&stats)
server.Reply(client, "last GC: %s", stats.LastGC.Format(time.RFC1123))
server.Reply(client, "num GC: %d", stats.NumGC)
server.Reply(client, "pause total: %s", stats.PauseTotal)
server.Reply(client, "pause quantiles min%%: %s", stats.PauseQuantiles[0])
server.Reply(client, "pause quantiles 25%%: %s", stats.PauseQuantiles[1])
server.Reply(client, "pause quantiles 50%%: %s", stats.PauseQuantiles[2])
server.Reply(client, "pause quantiles 75%%: %s", stats.PauseQuantiles[3])
server.Reply(client, "pause quantiles max%%: %s", stats.PauseQuantiles[4])
case "NUMGOROUTINE":
count := runtime.NumGoroutine()
server.Reply(client, "num goroutines: %d", count)
case "PROFILEHEAP":
profFile := "ergonomadic-heap.prof"
file, err := os.Create(profFile)
if err != nil {
log.Printf("error: %s", err)
break
}
defer file.Close()
pprof.Lookup("heap").WriteTo(file, 0)
server.Reply(client, "written to %s", profFile)
}
}
func (msg *VersionCommand) HandleServer(server *Server) {
client := msg.Client()
if (msg.target != "") && (msg.target != server.name) {

@ -7,25 +7,23 @@ import (
)
const (
R = '→'
W = '←'
EOF = ""
R = '→'
W = '←'
)
type Socket struct {
conn net.Conn
writer *bufio.Writer
closed bool
conn net.Conn
scanner *bufio.Scanner
writer *bufio.Writer
}
func NewSocket(conn net.Conn, commands chan<- Command) *Socket {
socket := &Socket{
conn: conn,
writer: bufio.NewWriter(conn),
func NewSocket(conn net.Conn) *Socket {
return &Socket{
conn: conn,
scanner: bufio.NewScanner(conn),
writer: bufio.NewWriter(conn),
}
go socket.readLines(commands)
return socket
}
func (socket *Socket) String() string {
@ -33,39 +31,43 @@ func (socket *Socket) String() string {
}
func (socket *Socket) Close() {
if socket.closed {
return
}
socket.closed = true
socket.conn.Close()
Log.debug.Printf("%s closed", socket)
}
func (socket *Socket) readLines(commands chan<- Command) {
commands <- NewProxyCommand(AddrLookupHostname(socket.conn.RemoteAddr()))
func (socket *Socket) Read() (line string, err error) {
if socket.closed {
err = io.EOF
return
}
scanner := bufio.NewScanner(socket.conn)
for scanner.Scan() {
line := scanner.Text()
for socket.scanner.Scan() {
line = socket.scanner.Text()
if len(line) == 0 {
continue
}
Log.debug.Printf("%s → %s", socket, line)
msg, err := ParseCommand(line)
if err != nil {
// TODO error messaging to client
continue
}
commands <- msg
return
}
if err := scanner.Err(); err != nil {
Log.debug.Printf("%s error: %s", socket, err)
err = socket.scanner.Err()
socket.isError(err, R)
if err == nil {
err = io.EOF
}
commands <- NewQuitCommand("connection closed")
close(commands)
return
}
func (socket *Socket) Write(line string) (err error) {
if socket.closed {
err = io.EOF
return
}
if _, err = socket.writer.WriteString(line); socket.isError(err, W) {
return
}