Skip to content

Commit

Permalink
Add sysinfo journalctl action to Sansshell
Browse files Browse the repository at this point in the history
  • Loading branch information
Peihao Chu committed Jul 28, 2023
1 parent ae4cd86 commit 86e9d37
Show file tree
Hide file tree
Showing 11 changed files with 1,283 additions and 36 deletions.
4 changes: 4 additions & 0 deletions cmd/sansshell-server/default-policy.rego
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ allow {
allow {
input.method = "/SysInfo.SysInfo/Dmesg"
}

allow {
input.method = "/SysInfo.SysInfo/Journal"
}
171 changes: 171 additions & 0 deletions services/sysinfo/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ package client

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"strings"
"time"

"github.com/google/subcommands"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/Snowflake-Labs/sansshell/client"
pb "github.com/Snowflake-Labs/sansshell/services/sysinfo"
Expand All @@ -41,6 +46,7 @@ func (*sysinfoCmd) GetSubpackage(f *flag.FlagSet) *subcommands.Commander {
c := client.SetupSubpackage(subPackage, f)
c.Register(&uptimeCmd{}, "")
c.Register(&dmesgCmd{}, "")
c.Register(&journalCmd{}, "")
return c
}

Expand Down Expand Up @@ -187,3 +193,168 @@ func (p *dmesgCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...interfa
}
return exit
}

type journalCmd struct {
since string
until string
tail int64
unit string
explain bool
output string
}

func (*journalCmd) Name() string { return "journalctl" }
func (*journalCmd) Synopsis() string { return "Get the log entries stored in journald" }
func (*journalCmd) Usage() string {
return `journalCtl [--since|--S=X] [--until|-U=X] [-tail=X] [-u|-unit=X] [-o|--output=X] [-x] :
Get the log entries stored in journald by systemd-journald.service
`
}

func (p *journalCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&p.since, "since", "", "Sets the date (YYYY-MM-DD HH:MM:SS) we want to filter from")
f.StringVar(&p.since, "S", "", "Sets the date (YYYY-MM-DD HH:MM:SS) we want to filter from (the date time is included)")
f.StringVar(&p.until, "until", "", "Sets the date (YYYY-MM-DD HH:MM:SS) we want to filter until (the date time is not included)")
f.StringVar(&p.until, "U", "", "Sets the date (YYYY-MM-DD HH:MM:SS) we want to filter until")
f.StringVar(&p.unit, "unit", "", "Sets systemd unit to filter messages")
f.StringVar(&p.output, "output", "", "Sets the format of the journal entries that will be shown. Right now only json and json-pretty are supported.")
f.BoolVar(&p.explain, "x", false, "If true, augment log lines with explanatory texts from the message catalog.")
f.Int64Var(&p.tail, "tail", 100, "If positive, the latest n records to fetch. By default, fetch latest 100 records. If negative, fetch all records")
}

func (p *journalCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
state := args[0].(*util.ExecuteState)
c := pb.NewSysInfoClientProxy(state.Conn)

// the output is case insensitive
p.output = strings.ToLower(p.output)
// currently output can only be json or json-pretty
if p.output != "" && p.output != "json" && p.output != "json-pretty" {
fmt.Fprintln(os.Stderr, "cannot set output to other formats unless json or json-pretty")
return subcommands.ExitUsageError
}

req := &pb.JournalRequest{
TailLine: int32(p.tail),
Explain: p.explain,
Unit: p.unit,
Output: p.output,
}
// Note: if timestamp is passed, don't forget to conver to UTC
expectedTimeFormat := "2006-01-02 15:04:05"
loc, err := time.LoadLocation("Local")
if err != nil {
fmt.Fprintln(os.Stderr, "cannot get local location")
return subcommands.ExitUsageError
}
if p.since != "" {
sinceTime, err := time.ParseInLocation(expectedTimeFormat, p.since, loc)
if err != nil {
fmt.Fprintln(os.Stderr, "please specify correct time pattern YYYY-MM-DD HH:MM:SS")
return subcommands.ExitUsageError
}
req.TimeSince = timestamppb.New(sinceTime.UTC())
}

if p.until != "" {
untilTime, err := time.ParseInLocation(expectedTimeFormat, p.until, loc)
if err != nil {
fmt.Fprintln(os.Stderr, "please specify correct time pattern YYYY-MM-DD HH:MM:SS")
return subcommands.ExitUsageError
}
req.TimeUntil = timestamppb.New(untilTime.UTC())
}

stream, err := c.JournalOneMany(ctx, req)
if err != nil {
// Emit this to every error file as it's not specific to a given target.
for _, e := range state.Err {
fmt.Fprintf(e, "All targets - could not info servers: %v\n", err)
}
return subcommands.ExitFailure
}

targetsDone := make(map[int]bool)
exit := subcommands.ExitSuccess
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
// Emit this to every error file as it's not specific to a given target.
// But...we only do this for targets that aren't complete. A complete target
// didn't have an error. i.e. we got N done then the context expired.
for i, e := range state.Err {
if !targetsDone[i] {
fmt.Fprintf(e, "Stream error: %v\n", err)
}
}
exit = subcommands.ExitFailure
break
}
for i, r := range resp {
if r.Error != nil && r.Error != io.EOF {
fmt.Fprintf(state.Err[r.Index], "Target %s (%d) returned error - %v\n", r.Target, r.Index, r.Error)
targetsDone[r.Index] = true
// If any target had errors it needs to be reported for that target but we still
// need to process responses off the channel. Final return code though should
// indicate something failed.
exit = subcommands.ExitFailure
continue
}
// At EOF this target is done.
if r.Error == io.EOF {
targetsDone[r.Index] = true
continue
}

switch t := r.Resp.Response.(type) {
case *pb.JournalReply_Journal:
journal := t.Journal
displayPid := ""
if journal.Pid != 0 {

displayPid = fmt.Sprintf("[%d]", journal.Pid)
}
fmt.Fprintf(state.Out[i], "[%s] %s %s%s: %s\n", journal.RealtimeTimestamp.AsTime().Local(), journal.Hostname, journal.SyslogIdentifier, displayPid, journal.Message)
// process explanatory texts if exists
if journal.Catalog != "" {
lines := strings.Split(journal.Catalog, "\n")
// if last line is empty, just remove it
if len(lines) > 0 && lines[len(lines)-1] == "" {
lines = lines[:len(lines)-1]
}
for idx := range lines {
lines[idx] = "-- " + lines[idx]
}
explainTexts := strings.Join(lines, "\n")
fmt.Fprintf(state.Out[i], "%s\n", explainTexts)
}
case *pb.JournalReply_JournalRaw:
journalRaw := t.JournalRaw
// Encode the map to JSON
var jsonData []byte
if p.output == "json" {
jsonData, err = json.Marshal(journalRaw.Entry)
if err != nil {
fmt.Fprintf(state.Err[r.Index], "Target %s (%d) returned cannot encode journal entry to JSON\n", r.Target, r.Index)
exit = subcommands.ExitFailure
continue
}
} else if p.output == "json-pretty" {
jsonData, err = json.MarshalIndent(journalRaw.Entry, "", " ")
if err != nil {
fmt.Fprintf(state.Err[r.Index], "Target %s (%d) returned cannot encode journal entry to pretty JSON\n", r.Target, r.Index)
exit = subcommands.ExitFailure
continue
}
}
// Convert the JSON data to a string
jsonString := string(jsonData)
fmt.Fprintf(state.Out[i], "%s\n", jsonString)
}
}
}
return exit
}
26 changes: 26 additions & 0 deletions services/sysinfo/server/sysinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
Description: "number of failures when performing sysinfo.Uptime"}
sysinfoDmesgFailureCounter = metrics.MetricDefinition{Name: "actions_sysinfo_dmesg_failure",
Description: "number of failures when performing sysinfo.Dmesg"}
sysinfoJournalFailureCounter = metrics.MetricDefinition{Name: "actions_sysinfo_journal_failure",
Description: "number of failures when performing sysinfo.Journal"}
)

// server is used to implement the gRPC server
Expand Down Expand Up @@ -113,6 +115,30 @@ func (s *server) Dmesg(req *pb.DmesgRequest, stream pb.SysInfo_DmesgServer) erro
return nil
}

func (s *server) Journal(req *pb.JournalRequest, stream pb.SysInfo_JournalServer) error {
ctx := stream.Context()
recorder := metrics.RecorderFromContextOrNoop(ctx)

// currently output can only be json or json-pretty
if req.Output != "" && req.Output != "json" && req.Output != "json-pretty" {
return status.Errorf(codes.InvalidArgument, "cannot set output to other formats unless json or json-pretty")
}

records, err := getJournalRecords(req)
if err != nil {
recorder.CounterOrLog(ctx, sysinfoUptimeFailureCounter, 1, attribute.String("reason", "get_journal_err"))
return err
}

for idx := len(records) - 1; idx >= 0; idx-- {
if err := stream.Send(records[idx]); err != nil {
recorder.CounterOrLog(ctx, sysinfoJournalFailureCounter, 1, attribute.String("reason", "stream_send_err"))
return status.Errorf(codes.Internal, "journal: send error %v", err)
}
}
return nil
}

// Register is called to expose this handler to the gRPC server
func (s *server) Register(gs *grpc.Server) {
pb.RegisterSysInfoServer(gs, s)
Expand Down
4 changes: 4 additions & 0 deletions services/sysinfo/server/sysinfo_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@ var getUptime = func() (time.Duration, error) {
var getKernelMessages = func() ([]*pb.DmsgRecord, error) {
return nil, status.Errorf(codes.Unimplemented, "dmesg is not supported")
}

var getJournalRecords = func(req *pb.JournalRequest) ([]*pb.JournalReply, error) {
return nil, status.Errorf(codes.Unimplemented, "journal is not supported")
}
118 changes: 116 additions & 2 deletions services/sysinfo/server/sysinfo_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ Copyright (c) 2023 Snowflake Inc. All rights reserved.
package server

import (
"fmt"
"strconv"
"time"

pb "github.com/Snowflake-Labs/sansshell/services/sysinfo"
"github.com/coreos/go-systemd/v22/sdjournal"

Check failure on line 27 in services/sysinfo/server/sysinfo_linux.go

View workflow job for this annotation

GitHub Actions / lint

could not import github.com/coreos/go-systemd/v22/sdjournal (-: # github.com/coreos/go-systemd/v22/sdjournal
"github.com/euank/go-kmsg-parser/v2/kmsgparser"
"golang.org/x/sys/unix"
"google.golang.org/grpc/codes"
Expand All @@ -36,10 +37,23 @@ var getKmsgParser = func() (kmsgparser.Parser, error) {
return kmsgparser.NewParser()
}

// provide journal interface for testing purpose
// in testing file, we can mock these functions
type journal interface {
SeekTail() error
Close() error
Previous() (uint64, error)
GetEntry() (*sdjournal.JournalEntry, error)
GetCatalog() (string, error)
}

var getJournal = func() (journal, error) {
return sdjournal.NewJournal()
}

var getUptime = func() (time.Duration, error) {
sysinfo := &unix.Sysinfo_t{}
if err := unix.Sysinfo(sysinfo); err != nil {
fmt.Println(err)
return 0, status.Errorf(codes.Internal, "err in get the system info from unix")
}
uptime := time.Duration(sysinfo.Uptime) * time.Second
Expand Down Expand Up @@ -78,3 +92,103 @@ var getKernelMessages = func() ([]*pb.DmsgRecord, error) {
}
return records, nil
}

var getJournalRecords = func(req *pb.JournalRequest) ([]*pb.JournalReply, error) {
journal, err := getJournal()
if err != nil {
return nil, status.Errorf(codes.Internal, "sdjournal initializes Journal instance error: %v", err)
}
defer journal.Close()

// Seek to the end of the journal entries
journal.SeekTail()

var records []*pb.JournalReply
for {
// if collect expected log entries, break the loop
// if req.TailLine is a negative number or larger than number of entries in journal
// just fetch all journal entries
if len(records) == int(req.TailLine) {
break
}
// move read pointer back by on entry
n, err := journal.Previous()
if err != nil {
return nil, status.Errorf(codes.Internal, "sdjournal advances read pointer error: %v", err)
}
// break if there are no more entries left
if n == 0 {
break
}

entry, err := journal.GetEntry()
if err != nil {
return nil, status.Errorf(codes.Internal, "sdjournal gets journal entry error: %v", err)
}

// filter based on input time
realtime := timestamppb.New(time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond)))
// req.TimeSince <= realtime <= req.TimeUntil
if req.TimeSince != nil && req.TimeSince.AsTime().After(realtime.AsTime()) {
continue
}
if req.TimeUntil != nil && req.TimeUntil.AsTime().Before(realtime.AsTime()) {
continue
}

// filter based on systemd unit
// why `UNIT`, refer to doc: https://man.archlinux.org/man/systemd.journal-fields.7.en
if unit, systemdUnit := entry.Fields["UNIT"], entry.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT]; req.Unit != "" &&
unit != req.Unit && systemdUnit != req.Unit {
continue
}

switch req.Output {
case "json", "json-pretty":
journalRecordRaw := &pb.JournalRecordRaw{}
journalRecordRaw.Entry = entry.Fields
entryMap := journalRecordRaw.Entry
// add cursor, realtime_timestamp and monotonic_timestamp to map
entryMap[sdjournal.SD_JOURNAL_FIELD_CURSOR] = entry.Cursor
entryMap[sdjournal.SD_JOURNAL_FIELD_REALTIME_TIMESTAMP] = strconv.FormatUint(entry.RealtimeTimestamp, 10)
entryMap[sdjournal.SD_JOURNAL_FIELD_MONOTONIC_TIMESTAMP] = strconv.FormatUint(entry.MonotonicTimestamp, 10)
records = append(records, &pb.JournalReply{
Response: &pb.JournalReply_JournalRaw{
JournalRaw: journalRecordRaw,
},
})
case "":
// default format
journalRecord := &pb.JournalRecord{}
journalRecord.RealtimeTimestamp = realtime
journalRecord.Hostname = entry.Fields[sdjournal.SD_JOURNAL_FIELD_HOSTNAME]
journalRecord.SyslogIdentifier = entry.Fields[sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER]
journalRecord.Message = entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE]

// some log entries may not have pid, since they are not generated by a process
if pidStr, ok := entry.Fields[sdjournal.SD_JOURNAL_FIELD_PID]; ok {
pid, err := strconv.Atoi(pidStr)
if err != nil {
return nil, status.Errorf(codes.Internal, "sdjournal pid converts error: %v from string to int32", err)
}
journalRecord.Pid = int32(pid)
}

// augument the records
// can only add explanatory text for those log entries that have `MESSAGE_ID` field
if _, ok := entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE_ID]; ok && req.Explain {
catalog, err := journal.GetCatalog()
if err != nil {
return nil, status.Errorf(codes.Internal, "sdjournal getCatalog error: %v", err)
}
journalRecord.Catalog = catalog
}
records = append(records, &pb.JournalReply{
Response: &pb.JournalReply_Journal{
Journal: journalRecord,
},
})
}
}
return records, nil
}
Loading

0 comments on commit 86e9d37

Please sign in to comment.