lopipe: elasticsearch logging

This commit is contained in:
user 2023-10-23 22:17:34 +05:30 committed by SkyperTHC
parent 71cc7a77a0
commit a5d69d5fe2
No known key found for this signature in database
GPG Key ID: A9BD386DF9113CD6
17 changed files with 365 additions and 2 deletions

@ -0,0 +1,6 @@
metric_logger_queue_size: 1000
metric_flush_interval: 1 #in minutes
elastic_server_host: "sf-stats.net"
elastic_index_name: "sf-stats"
elastic_username: ""
elastic_password: ""

@ -99,6 +99,18 @@ services:
- "${SF_BASEDIR:-.}/config:${SF_BASEDIR:-.}/config"
- "/sys/fs/cgroup:/sys/fs/cgroup"
sf-logpipe:
build: tools/logpipe
image: sf-logpipe
container_name: sf-logpipe
restart: ${SF_RESTART:-on-failure}
cgroup_parent: sf.slice
pid: "host"
network_mode: host
volumes:
- "${SF_BASEDIR:-.}/config/etc/logpipe/config.yaml:/app/config.yaml:ro"
- /dev/shm/sf/run/logpipe/:/app/sock/:rw
sf-portd:
build: encfsd
image: sf-encfsd

@ -1,4 +1,4 @@
all: albuild fs-root/bin/docker-exec-sigproxy fs-root/usr/sbin/sshd Dockerfile
all: albuild fs-root/bin/docker-exec-sigproxy fs-root/bin/unix-socket-client fs-root/usr/sbin/sshd Dockerfile
docker build --no-cache --network host -t sf-host .
albuild:
@ -15,6 +15,10 @@ fs-root/bin/docker-exec-sigproxy: docker-exec-sigproxy.c
docker run --rm -v$$(pwd):/src -w /src alpine-gcc gcc -Wall -O2 -o fs-root/bin/docker-exec-sigproxy docker-exec-sigproxy.c
@echo SUCCESS
fs-root/bin/unix-socket-client: unix-socket-client.c
docker run --rm -v$$(pwd):/src -w /src alpine-gcc gcc -Wall -O2 -o fs-root/bin/unix-socket-client unix-socket-client.c
@echo SUCCESS
diff:
cd dev && \
diff -x '!*.[ch]' -u openssh-9.2p1-orig/ openssh-9.2p1-sf/ | grep -Ev ^"(Only in|Common)" >../sf-sshd.patch

@ -1319,6 +1319,9 @@ tofile "${C_IP:?}" "/config/self-for-guest/lg-${LID}/c_ip"
echo_pty -e "....[${CG}OK${CN}]"
# Add a log entry into elastisearch using logpipe
echo "LID:${LID}|Hostname:${SF_HOSTNAME}|" | unix-socket-client &> /dev/null
# Spawn shell
spawn_shell_exit "$@"
# NOT REACHED

@ -2,7 +2,7 @@
# Fixing vmbox permissions
chmod 755 /etc /usr /bin /bin/segfaultsh /bin/webshellsh /bin/asksecsh /bin/mmdbinspect /bin/docker-exec-sigproxy
chmod 755 /etc /usr /bin /bin/segfaultsh /bin/webshellsh /bin/asksecsh /bin/mmdbinspect /bin/docker-exec-sigproxy /bin/unix-socket-client
chmod 644 /etc/english.txt
echo -e "/bin/segfaultsh\n/bin/webshellsh" >>/etc/shells

47
host/unix-socket-client.c Normal file

@ -0,0 +1,47 @@
// Problem: Alpine's nc does not support connecting to unix sockets, this was needed for logpipe(see tools/logpipe).
// This utility connects to logpipe's socket, writes whatever it recieves on stdin and then exits.
//
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#define SOCKET_PATH "/dev/shm/sf/run/logpipe/logPipe.sock"
int main(int argc, char *argv[]) {
int sockfd;
struct sockaddr_un addr;
sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);
if (connect(sockfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) == -1) {
perror("connect");
exit(EXIT_FAILURE);
}
char buf[1024];
ssize_t nread;
while ((nread = read(STDIN_FILENO, buf, sizeof(buf))) > 0) {
if (write(sockfd, buf, nread) != nread) {
perror("write");
exit(EXIT_FAILURE);
}
}
if (nread == -1) {
perror("read");
exit(EXIT_FAILURE);
}
close(sockfd);
return EXIT_SUCCESS;
}

11
tools/logpipe/Dockerfile Normal file

@ -0,0 +1,11 @@
FROM golang:latest AS build
WORKDIR /build
COPY go.mod go.sum ./
RUN go mod download
COPY *.go ./
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -tags prod -ldflags '-w'
FROM alpine:latest
WORKDIR /app/
COPY --from=build /build/logpipe /app/logpipe
ENTRYPOINT ["/app/logpipe"]

20
tools/logpipe/README.md Normal file

@ -0,0 +1,20 @@
### LogPipe
Accept logs via cmdline and ship them to elasticsearch
### Building
- `go build -o bin/logpipe`
or via docker
- `docker build -t sf-logpipe .`
- `docker-compose up -d`
### Adding a log entry
- Start the program - `./bin/logpipe` (ignore if using docker)
- Then run `echo "Myattribute:MyValue|Myattribute2:MyValue2|" | nc -U ./logPipe.sock`
or
- `echo "Myattribute:MyValue|Myattribute2:MyValue2|" | uniz-socket-client`
Notes:
- Log format is "attr:val|", each attribute-value pair must be terminated with a pipe(|)
- Timestamp is automatically added to log entries
- Elasticsearch credentials must be configured in `config.yaml`
- Program/Container must be restarted after config changes

@ -0,0 +1,5 @@
### Compilation
`gcc client.c -o sock_client`
### Usage
`echo "Myattribute:MyValue|Myattribute2:MyValue2|" | ./sock_client`

@ -0,0 +1,44 @@
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#define SOCKET_PATH "/dev/shm/sf/run/logpipe/logPipe.sock"
int main(int argc, char *argv[]) {
int sockfd;
struct sockaddr_un addr;
sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);
if (connect(sockfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) == -1) {
perror("connect");
exit(EXIT_FAILURE);
}
char buf[1024];
ssize_t nread;
while ((nread = read(STDIN_FILENO, buf, sizeof(buf))) > 0) {
if (write(sockfd, buf, nread) != nread) {
perror("write");
exit(EXIT_FAILURE);
}
}
if (nread == -1) {
perror("read");
exit(EXIT_FAILURE);
}
close(sockfd);
return EXIT_SUCCESS;
}

@ -0,0 +1,6 @@
metric_logger_queue_size: 1000
metric_flush_interval: 1 #in minutes
elastic_server_host: "sf-stats.net"
elastic_index_name: "sf-stats"
elastic_username: ""
elastic_password: ""

@ -0,0 +1,6 @@
services:
sf-logpipe:
image: sf-logpipe
volumes:
- /dev/shm/sf/run/logpipe/:/app/sock/:rw
- ./config.yaml:/app/config.yaml:ro

5
tools/logpipe/go.mod Normal file

@ -0,0 +1,5 @@
module logpipe
go 1.21.1
require gopkg.in/yaml.v2 v2.4.0

4
tools/logpipe/go.sum Normal file

@ -0,0 +1,4 @@
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

73
tools/logpipe/main.go Normal file

@ -0,0 +1,73 @@
package main
import (
"fmt"
"log"
"net"
"os"
"gopkg.in/yaml.v2"
)
type LogPipe struct {
MetricLoggerQueueSize int `yaml:"metric_logger_queue_size"`
MetricFlushInterval int `yaml:"metric_flush_interval"`
ElasticServerHost string `yaml:"elastic_server_host"`
ElasticIndexName string `yaml:"elastic_index_name"`
ElasticUsername string `yaml:"elastic_username"`
ElasticPassword string `yaml:"elastic_password"`
}
func main() {
lp := LogPipe{}
fbytes, ferr := os.ReadFile("config.yaml")
if ferr == nil {
err := yaml.Unmarshal(fbytes, &lp)
if err != nil {
log.Println("Failed Unmarshal data", err)
}
MLogger.StartLogger(lp.MetricLoggerQueueSize, 1,
lp.ElasticServerHost, lp.ElasticIndexName,
lp.ElasticUsername, lp.ElasticPassword)
log.Println("Listening on socket logPipe.sock")
listenOnSocket("./sock/logPipe.sock")
} else {
log.Println("Could not read config.yaml")
}
}
func listenOnSocket(socketFile string) {
os.Remove(socketFile)
listener, err := net.Listen("unix", socketFile)
if err != nil {
fmt.Println("Error creating listener:", err)
return
}
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println("Error accepting connection:", err)
continue
}
go handleConnection(conn)
}
}
func handleConnection(conn net.Conn) {
defer conn.Close()
buf := make([]byte, 2048)
_, err := conn.Read(buf)
if err != nil {
log.Println("Error reading from connection:", err)
return
}
logStr := string(buf)
MLogger.AddLogEntry(&logStr)
}

117
tools/logpipe/metrics.go Normal file

@ -0,0 +1,117 @@
package main
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync/atomic"
"time"
)
type MetricLogger struct {
LogQueue chan *string // Queue of Metrics that is to be flushed
LoggingActive *atomic.Bool
FlushInterval time.Duration
ElasticServerUrl string
ElasticIndexName string
}
var MLogger = MetricLogger{}
func (metricLogger *MetricLogger) StartLogger(queueSize int, flushInterval int,
elasticServerHost string, elasticIndexName string,
elasticUsername string, elasticPassword string) {
metricLogger.LogQueue = make(chan *string, queueSize)
metricLogger.LoggingActive = &atomic.Bool{}
metricLogger.LoggingActive.Store(true)
metricLogger.FlushInterval = time.Second * time.Duration(flushInterval)
metricLogger.ElasticIndexName = elasticIndexName
metricLogger.ElasticServerUrl = fmt.Sprintf("https://%s:%s@%s", elasticUsername, elasticPassword, elasticServerHost)
go metricLogger.periodicFlush()
}
func (metricLogger *MetricLogger) AddLogEntry(log *string) {
if metricLogger.LoggingActive.Load() {
var logEntry = make(map[string]string)
logEntry["Time"] = time.Now().Format(time.RFC3339)
// split by |
sections := strings.Split(*log, "|")
if len(sections) < 1 {
return
}
// split by :
for _, section := range sections {
parts := strings.Split(section, ":")
if len(parts) == 2 {
logEntry[parts[0]] = parts[1]
}
}
logBytes, jerr := json.Marshal(logEntry)
logStr := string(logBytes)
if jerr == nil {
select {
case metricLogger.LogQueue <- &logStr:
default: // Channel full
}
}
}
}
func (metricLogger *MetricLogger) periodicFlush() {
for {
time.Sleep(metricLogger.FlushInterval)
metricLogger.FlushQueue()
}
}
func (metricLogger *MetricLogger) FlushQueue() {
logData := strings.Builder{}
outer:
for { // Flush everything in the queue
select {
case LogEntry, ok := <-metricLogger.LogQueue:
if !ok {
break
}
logData.WriteString(`{ "index":{} }`)
logData.WriteByte(10)
logData.WriteString(*LogEntry)
logData.WriteByte(10)
default:
break outer
}
}
if logData.Len() > 0 {
metricLogger.Insert(logData.String())
}
}
func (metricLogger *MetricLogger) Insert(Data string) error {
client := &http.Client{}
req, err := http.NewRequest("POST", metricLogger.ElasticServerUrl+"/"+
metricLogger.ElasticIndexName+"/_bulk", strings.NewReader(Data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == 201 {
return nil
}
return errors.New("Insert Failed")
}