6
1
mirror of https://git.mills.io/saltyim/saltyim.git synced 2024-06-16 03:48:24 +00:00
prologic-saltyim/internal/server.go

390 lines
8.7 KiB
Go

package internal
import (
"context"
"fmt"
"net"
"net/http"
"path/filepath"
"time"
"git.mills.io/prologic/msgbus"
"git.mills.io/prologic/observe"
"github.com/NYTimes/gziphandler"
"github.com/julienschmidt/httprouter"
"github.com/justinas/nosurf"
"github.com/robfig/cron"
log "github.com/sirupsen/logrus"
"github.com/unrolled/logger"
"golang.org/x/crypto/acme/autocert"
"go.salty.im/saltyim"
)
const (
acmeDir = "acme"
logsDir = "logs"
servicesIdentity = "svc.key"
)
var (
metrics *observe.Metrics
)
func init() {
metrics = observe.NewMetrics("twtd")
}
// Server ...
type Server struct {
bind string
config *Config
router *Router
server *http.Server
// Message Bus
bus *msgbus.MessageBus
// Services User
svc *saltyim.Service
// Data Store
db Store
// Scheduler
cron *cron.Cron
// API
api *API
}
// AddRoute ...
func (s *Server) AddRoute(method, path string, handler http.Handler) {
s.router.Handler(method, path, handler)
}
// AddShutdownHook ...
func (s *Server) AddShutdownHook(f func()) {
s.server.RegisterOnShutdown(f)
}
// Shutdown ...
func (s *Server) Shutdown(ctx context.Context) error {
s.cron.Stop()
if err := s.server.Shutdown(ctx); err != nil {
log.WithError(err).Error("error shutting down server")
return err
}
if err := s.db.Close(); err != nil {
log.WithError(err).Error("error closing store")
return err
}
return nil
}
// Run ...
func (s *Server) Run(ctx context.Context) (err error) {
idleConnsClosed := make(chan struct{})
go func() {
if err = s.ListenAndServe(); err != http.ErrServerClosed {
// Error starting or closing listener:
log.WithError(err).Fatal("HTTP server ListenAndServe")
}
}()
go func() {
if err := s.svc.Run(ctx); err != nil {
log.WithError(err).Error("error running service user")
}
}()
<-ctx.Done()
log.Info("Shutting down...")
if err = s.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
log.WithError(err).Fatal("Error shutting down HTTP server")
}
close(idleConnsClosed)
<-idleConnsClosed
return
}
// ListenAndServe ...
func (s *Server) ListenAndServe() error {
_, port, err := net.SplitHostPort(s.bind)
if err != nil {
log.WithError(err).Errorf("error parsing bind hostport %s", s.bind)
return err
}
useLetsEncrypt := s.config.TLSKey == "" && s.config.TLSCert == ""
if s.config.TLS {
if useLetsEncrypt && (port == "443" || port == "https") {
log.Info("Setting up Lets Encrypt ...")
m := &autocert.Manager{
Cache: autocert.DirCache(filepath.Join(s.config.Data, acmeDir)),
Prompt: autocert.AcceptTOS,
Email: s.config.SupportEmail,
HostPolicy: autocert.HostWhitelist(s.config.baseURL.Hostname()),
}
s.server.TLSConfig = m.TLSConfig()
httpServer := &http.Server{
Addr: ":http",
Handler: logger.New(logger.Options{
Prefix: "yarnd-http",
RemoteAddressHeaders: []string{"X-Forwarded-For"},
}).Handler(m.HTTPHandler(nil)),
}
go func() {
if err := httpServer.ListenAndServe(); err != nil {
log.WithError(err).Fatalf("error running http server")
}
}()
return s.server.ListenAndServeTLS("", "")
}
log.Infof("Setting up TLS (key=%s cert=%s)", s.config.TLSKey, s.config.TLSCert)
return s.server.ListenAndServeTLS(s.config.TLSCert, s.config.TLSKey)
}
log.Warn("No TLS configured")
return s.server.ListenAndServe()
}
// AddCronJob ...
func (s *Server) AddCronJob(spec string, job cron.Job) error {
return s.cron.AddJob(spec, job)
}
func (s *Server) setupMetrics() {
ctime := time.Now()
// server uptime counter
metrics.NewCounterFunc(
"server", "uptime",
"Number of nanoseconds the server has been running",
func() float64 {
return float64(time.Since(ctime).Nanoseconds())
},
)
// server info
metrics.NewGaugeVec(
"server", "info",
"Server information",
[]string{"full_version", "version", "commit"},
)
metrics.GaugeVec("server", "info").
With(map[string]string{
"full_version": saltyim.FullVersion(),
"version": saltyim.Version,
"commit": saltyim.Commit,
}).Set(1)
s.AddRoute("GET", "/metrics", metrics.Handler())
}
func (s *Server) setupCronJobs() error {
InitJobs(s.config)
for name, jobSpec := range Jobs {
if jobSpec.Schedule == "" {
continue
}
job := jobSpec.Factory(s.config, s.db)
if err := s.cron.AddJob(jobSpec.Schedule, job); err != nil {
return fmt.Errorf("invalid cron schedule for job %s: %v (see https://pkg.go.dev/github.com/robfig/cron)", name, err)
}
log.Infof("Started background job %s (%s)", name, jobSpec.Schedule)
}
return nil
}
func (s *Server) setupServiceUser() error {
svcUser := fmt.Sprintf("%s@%s", serviceUser, s.config.PrimaryDomain)
svcUserState := filepath.Join(s.config.Data, fmt.Sprintf("%s.json", serviceUser))
log.Infof("starting service user %s", svcUser)
// create our addr
me, err := saltyim.ParseAddr(svcUser)
if err != nil {
return err
}
// create or load client for services user
fn := filepath.Join(s.config.Data, servicesIdentity)
id, err := saltyim.GetOrCreateIdentity(
saltyim.WithIdentityPath(fn),
saltyim.WithIdentityAddr(me),
)
if err != nil {
return err
}
if err := CreateConfig(s.config, me.Hash(), id.Key().ID().String()); err != nil {
if err != ErrAddressExists {
return err
}
}
svc, err := saltyim.NewService(me, id, svcUserState)
if err != nil {
return err
}
s.svc = svc
return nil
}
func (s *Server) runStartupJobs() {
log.Info("running startup jobs")
for name, jobSpec := range StartupJobs {
job := jobSpec.Factory(s.config, s.db)
log.Infof("running %s now...", name)
job.Run()
}
// Merge store
if err := s.db.Merge(); err != nil {
log.WithError(err).Error("error merging store")
}
}
func (s *Server) initRoutes() {
// Discovery
s.router.GET("/.well-known/salty/:config", s.ConfigHandler())
// Inbox Endpoint(s)
s.router.Handler(http.MethodGet, "/inbox/:inbox", http.StripPrefix("/inbox", s.bus))
s.router.Handler(http.MethodPost, "/inbox/:inbox", http.StripPrefix("/inbox", s.bus))
// Avatar Service
s.router.GET("/avatar/:hash", s.AvatarHandler())
}
// NewServer ...
func NewServer(bind string, options ...Option) (*Server, error) {
config := NewConfig()
for _, opt := range options {
if err := opt(config); err != nil {
return nil, err
}
}
if err := config.Validate(); err != nil {
log.WithError(err).Error("error validating config")
return nil, fmt.Errorf("error validating config: %w", err)
}
bus, err := msgbus.NewMessageBus(
msgbus.WithLogPath(filepath.Join(config.Data, logsDir)),
)
if err != nil {
log.WithError(err).Error("error creating message bus")
return nil, err
}
db, err := NewStore(config.Store)
if err != nil {
log.WithError(err).Error("error creating store")
return nil, err
}
if err := db.Merge(); err != nil {
log.WithError(err).Error("error merging store")
return nil, err
}
router := NewRouter()
router.Use(Middleware(func(next httprouter.Handle) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "*")
w.Header().Set("Access-Control-Expose-Headers", "*")
next(w, r, p)
}
}))
api := NewAPI(router, config, db)
csrfHandler := nosurf.New(router)
csrfHandler.ExemptGlob("/api/v1/*")
csrfHandler.ExemptGlob("/api/v1/blob/*")
csrfHandler.ExemptGlob("/.well-known/*")
csrfHandler.ExemptGlob("/inbox/*")
server := &Server{
bind: bind,
config: config,
router: router,
server: &http.Server{
Addr: bind,
Handler: logger.New(logger.Options{
Prefix: "saltyd",
RemoteAddressHeaders: []string{"X-Forwarded-For"},
}).Handler(gziphandler.GzipHandler(
csrfHandler,
),
),
},
// Bus
bus: bus,
// API
api: api,
// Data Store
db: db,
// Schedular
cron: cron.New(),
}
if err := server.setupCronJobs(); err != nil {
log.WithError(err).Error("error setting up background jobs")
return nil, err
}
server.cron.Start()
log.Info("started background jobs")
if err := server.setupServiceUser(); err != nil {
log.WithError(err).Error("error setting up service user")
return nil, err
}
log.Info("succeessfully setup service user")
server.setupMetrics()
log.Infof("serving metrics endpoint at %s/metrics", server.config.BaseURL)
// Log interesting configuration options
log.Infof("Debug: %t", server.config.Debug)
log.Infof("Base URL: %s", server.config.BaseURL)
log.Infof("Primary Domain: %s", server.config.PrimaryDomain)
log.Infof("Admin User: %s", server.config.AdminUser)
log.Infof("Support Email: %s", server.config.SupportEmail)
api.initRoutes()
server.initRoutes()
go server.runStartupJobs()
return server, nil
}