diff --git a/cmd/sansshell-server/default-policy.rego b/cmd/sansshell-server/default-policy.rego index a8479d9b..5d74f586 100644 --- a/cmd/sansshell-server/default-policy.rego +++ b/cmd/sansshell-server/default-policy.rego @@ -84,3 +84,7 @@ allow { allow { input.method = "/SysInfo.SysInfo/Dmesg" } + +allow { + input.method = "/SysInfo.SysInfo/Journal" +} diff --git a/services/sysinfo/client/client.go b/services/sysinfo/client/client.go index 23d9f5ff..fdeed33b 100644 --- a/services/sysinfo/client/client.go +++ b/services/sysinfo/client/client.go @@ -19,12 +19,17 @@ package client import ( "context" + "encoding/json" "flag" "fmt" "io" + "os" + "strings" + "time" "github.com/google/subcommands" "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/Snowflake-Labs/sansshell/client" pb "github.com/Snowflake-Labs/sansshell/services/sysinfo" @@ -41,6 +46,7 @@ func (*sysinfoCmd) GetSubpackage(f *flag.FlagSet) *subcommands.Commander { c := client.SetupSubpackage(subPackage, f) c.Register(&uptimeCmd{}, "") c.Register(&dmesgCmd{}, "") + c.Register(&journalCmd{}, "") return c } @@ -187,3 +193,168 @@ func (p *dmesgCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...interfa } return exit } + +type journalCmd struct { + since string + until string + tail int64 + unit string + explain bool + output string +} + +func (*journalCmd) Name() string { return "journalctl" } +func (*journalCmd) Synopsis() string { return "Get the log entries stored in journald" } +func (*journalCmd) Usage() string { + return `journalCtl [--since|--S=X] [--until|-U=X] [-tail=X] [-u|-unit=X] [-o|--output=X] [-x] : + Get the log entries stored in journald by systemd-journald.service +` +} + +func (p *journalCmd) SetFlags(f *flag.FlagSet) { + f.StringVar(&p.since, "since", "", "Sets the date (YYYY-MM-DD HH:MM:SS) we want to filter from") + f.StringVar(&p.since, "S", "", "Sets the date (YYYY-MM-DD HH:MM:SS) we want to filter from (the date time is included)") + f.StringVar(&p.until, "until", "", "Sets the date (YYYY-MM-DD HH:MM:SS) we want to filter until (the date time is not included)") + f.StringVar(&p.until, "U", "", "Sets the date (YYYY-MM-DD HH:MM:SS) we want to filter until") + f.StringVar(&p.unit, "unit", "", "Sets systemd unit to filter messages") + f.StringVar(&p.output, "output", "", "Sets the format of the journal entries that will be shown. Right now only json and json-pretty are supported.") + f.BoolVar(&p.explain, "x", false, "If true, augment log lines with explanatory texts from the message catalog.") + f.Int64Var(&p.tail, "tail", 100, "If positive, the latest n records to fetch. By default, fetch latest 100 records. If negative, fetch all records") +} + +func (p *journalCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus { + state := args[0].(*util.ExecuteState) + c := pb.NewSysInfoClientProxy(state.Conn) + + // the output is case insensitive + p.output = strings.ToLower(p.output) + // currently output can only be json or json-pretty + if p.output != "" && p.output != "json" && p.output != "json-pretty" { + fmt.Fprintln(os.Stderr, "cannot set output to other formats unless json or json-pretty") + return subcommands.ExitUsageError + } + + req := &pb.JournalRequest{ + TailLine: int32(p.tail), + Explain: p.explain, + Unit: p.unit, + Output: p.output, + } + // Note: if timestamp is passed, don't forget to conver to UTC + expectedTimeFormat := "2006-01-02 15:04:05" + loc, err := time.LoadLocation("Local") + if err != nil { + fmt.Fprintln(os.Stderr, "cannot get local location") + return subcommands.ExitUsageError + } + if p.since != "" { + sinceTime, err := time.ParseInLocation(expectedTimeFormat, p.since, loc) + if err != nil { + fmt.Fprintln(os.Stderr, "please specify correct time pattern YYYY-MM-DD HH:MM:SS") + return subcommands.ExitUsageError + } + req.TimeSince = timestamppb.New(sinceTime.UTC()) + } + + if p.until != "" { + untilTime, err := time.ParseInLocation(expectedTimeFormat, p.until, loc) + if err != nil { + fmt.Fprintln(os.Stderr, "please specify correct time pattern YYYY-MM-DD HH:MM:SS") + return subcommands.ExitUsageError + } + req.TimeUntil = timestamppb.New(untilTime.UTC()) + } + + stream, err := c.JournalOneMany(ctx, req) + if err != nil { + // Emit this to every error file as it's not specific to a given target. + for _, e := range state.Err { + fmt.Fprintf(e, "All targets - could not info servers: %v\n", err) + } + return subcommands.ExitFailure + } + + targetsDone := make(map[int]bool) + exit := subcommands.ExitSuccess + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + // Emit this to every error file as it's not specific to a given target. + // But...we only do this for targets that aren't complete. A complete target + // didn't have an error. i.e. we got N done then the context expired. + for i, e := range state.Err { + if !targetsDone[i] { + fmt.Fprintf(e, "Stream error: %v\n", err) + } + } + exit = subcommands.ExitFailure + break + } + for i, r := range resp { + if r.Error != nil && r.Error != io.EOF { + fmt.Fprintf(state.Err[r.Index], "Target %s (%d) returned error - %v\n", r.Target, r.Index, r.Error) + targetsDone[r.Index] = true + // If any target had errors it needs to be reported for that target but we still + // need to process responses off the channel. Final return code though should + // indicate something failed. + exit = subcommands.ExitFailure + continue + } + // At EOF this target is done. + if r.Error == io.EOF { + targetsDone[r.Index] = true + continue + } + + switch t := r.Resp.Response.(type) { + case *pb.JournalReply_Journal: + journal := t.Journal + displayPid := "" + if journal.Pid != 0 { + + displayPid = fmt.Sprintf("[%d]", journal.Pid) + } + fmt.Fprintf(state.Out[i], "[%s] %s %s%s: %s\n", journal.RealtimeTimestamp.AsTime().Local(), journal.Hostname, journal.SyslogIdentifier, displayPid, journal.Message) + // process explanatory texts if exists + if journal.Catalog != "" { + lines := strings.Split(journal.Catalog, "\n") + // if last line is empty, just remove it + if len(lines) > 0 && lines[len(lines)-1] == "" { + lines = lines[:len(lines)-1] + } + for idx := range lines { + lines[idx] = "-- " + lines[idx] + } + explainTexts := strings.Join(lines, "\n") + fmt.Fprintf(state.Out[i], "%s\n", explainTexts) + } + case *pb.JournalReply_JournalRaw: + journalRaw := t.JournalRaw + // Encode the map to JSON + var jsonData []byte + if p.output == "json" { + jsonData, err = json.Marshal(journalRaw.Entry) + if err != nil { + fmt.Fprintf(state.Err[r.Index], "Target %s (%d) returned cannot encode journal entry to JSON\n", r.Target, r.Index) + exit = subcommands.ExitFailure + continue + } + } else if p.output == "json-pretty" { + jsonData, err = json.MarshalIndent(journalRaw.Entry, "", " ") + if err != nil { + fmt.Fprintf(state.Err[r.Index], "Target %s (%d) returned cannot encode journal entry to pretty JSON\n", r.Target, r.Index) + exit = subcommands.ExitFailure + continue + } + } + // Convert the JSON data to a string + jsonString := string(jsonData) + fmt.Fprintf(state.Out[i], "%s\n", jsonString) + } + } + } + return exit +} diff --git a/services/sysinfo/server/sysinfo.go b/services/sysinfo/server/sysinfo.go index d61feef7..88db1e1a 100644 --- a/services/sysinfo/server/sysinfo.go +++ b/services/sysinfo/server/sysinfo.go @@ -40,6 +40,8 @@ var ( Description: "number of failures when performing sysinfo.Uptime"} sysinfoDmesgFailureCounter = metrics.MetricDefinition{Name: "actions_sysinfo_dmesg_failure", Description: "number of failures when performing sysinfo.Dmesg"} + sysinfoJournalFailureCounter = metrics.MetricDefinition{Name: "actions_sysinfo_journal_failure", + Description: "number of failures when performing sysinfo.Journal"} ) // server is used to implement the gRPC server @@ -113,6 +115,30 @@ func (s *server) Dmesg(req *pb.DmesgRequest, stream pb.SysInfo_DmesgServer) erro return nil } +func (s *server) Journal(req *pb.JournalRequest, stream pb.SysInfo_JournalServer) error { + ctx := stream.Context() + recorder := metrics.RecorderFromContextOrNoop(ctx) + + // currently output can only be json or json-pretty + if req.Output != "" && req.Output != "json" && req.Output != "json-pretty" { + return status.Errorf(codes.InvalidArgument, "cannot set output to other formats unless json or json-pretty") + } + + records, err := getJournalRecords(req) + if err != nil { + recorder.CounterOrLog(ctx, sysinfoUptimeFailureCounter, 1, attribute.String("reason", "get_journal_err")) + return err + } + + for idx := len(records) - 1; idx >= 0; idx-- { + if err := stream.Send(records[idx]); err != nil { + recorder.CounterOrLog(ctx, sysinfoJournalFailureCounter, 1, attribute.String("reason", "stream_send_err")) + return status.Errorf(codes.Internal, "journal: send error %v", err) + } + } + return nil +} + // Register is called to expose this handler to the gRPC server func (s *server) Register(gs *grpc.Server) { pb.RegisterSysInfoServer(gs, s) diff --git a/services/sysinfo/server/sysinfo_default.go b/services/sysinfo/server/sysinfo_default.go index 05230c7b..bd045adc 100644 --- a/services/sysinfo/server/sysinfo_default.go +++ b/services/sysinfo/server/sysinfo_default.go @@ -34,3 +34,7 @@ var getUptime = func() (time.Duration, error) { var getKernelMessages = func() ([]*pb.DmsgRecord, error) { return nil, status.Errorf(codes.Unimplemented, "dmesg is not supported") } + +var getJournalRecords = func(req *pb.JournalRequest) ([]*pb.JournalReply, error) { + return nil, status.Errorf(codes.Unimplemented, "journal is not supported") +} diff --git a/services/sysinfo/server/sysinfo_linux.go b/services/sysinfo/server/sysinfo_linux.go index 8b0dc26c..4fe11f2e 100644 --- a/services/sysinfo/server/sysinfo_linux.go +++ b/services/sysinfo/server/sysinfo_linux.go @@ -20,10 +20,11 @@ Copyright (c) 2023 Snowflake Inc. All rights reserved. package server import ( - "fmt" + "strconv" "time" pb "github.com/Snowflake-Labs/sansshell/services/sysinfo" + "github.com/coreos/go-systemd/v22/sdjournal" "github.com/euank/go-kmsg-parser/v2/kmsgparser" "golang.org/x/sys/unix" "google.golang.org/grpc/codes" @@ -36,10 +37,23 @@ var getKmsgParser = func() (kmsgparser.Parser, error) { return kmsgparser.NewParser() } +// provide journal interface for testing purpose +// in testing file, we can mock these functions +type journal interface { + SeekTail() error + Close() error + Previous() (uint64, error) + GetEntry() (*sdjournal.JournalEntry, error) + GetCatalog() (string, error) +} + +var getJournal = func() (journal, error) { + return sdjournal.NewJournal() +} + var getUptime = func() (time.Duration, error) { sysinfo := &unix.Sysinfo_t{} if err := unix.Sysinfo(sysinfo); err != nil { - fmt.Println(err) return 0, status.Errorf(codes.Internal, "err in get the system info from unix") } uptime := time.Duration(sysinfo.Uptime) * time.Second @@ -78,3 +92,103 @@ var getKernelMessages = func() ([]*pb.DmsgRecord, error) { } return records, nil } + +var getJournalRecords = func(req *pb.JournalRequest) ([]*pb.JournalReply, error) { + journal, err := getJournal() + if err != nil { + return nil, status.Errorf(codes.Internal, "sdjournal initializes Journal instance error: %v", err) + } + defer journal.Close() + + // Seek to the end of the journal entries + journal.SeekTail() + + var records []*pb.JournalReply + for { + // if collect expected log entries, break the loop + // if req.TailLine is a negative number or larger than number of entries in journal + // just fetch all journal entries + if len(records) == int(req.TailLine) { + break + } + // move read pointer back by on entry + n, err := journal.Previous() + if err != nil { + return nil, status.Errorf(codes.Internal, "sdjournal advances read pointer error: %v", err) + } + // break if there are no more entries left + if n == 0 { + break + } + + entry, err := journal.GetEntry() + if err != nil { + return nil, status.Errorf(codes.Internal, "sdjournal gets journal entry error: %v", err) + } + + // filter based on input time + realtime := timestamppb.New(time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))) + // req.TimeSince <= realtime <= req.TimeUntil + if req.TimeSince != nil && req.TimeSince.AsTime().After(realtime.AsTime()) { + continue + } + if req.TimeUntil != nil && req.TimeUntil.AsTime().Before(realtime.AsTime()) { + continue + } + + // filter based on systemd unit + // why `UNIT`, refer to doc: https://man.archlinux.org/man/systemd.journal-fields.7.en + if unit, systemdUnit := entry.Fields["UNIT"], entry.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT]; req.Unit != "" && + unit != req.Unit && systemdUnit != req.Unit { + continue + } + + switch req.Output { + case "json", "json-pretty": + journalRecordRaw := &pb.JournalRecordRaw{} + journalRecordRaw.Entry = entry.Fields + entryMap := journalRecordRaw.Entry + // add cursor, realtime_timestamp and monotonic_timestamp to map + entryMap[sdjournal.SD_JOURNAL_FIELD_CURSOR] = entry.Cursor + entryMap[sdjournal.SD_JOURNAL_FIELD_REALTIME_TIMESTAMP] = strconv.FormatUint(entry.RealtimeTimestamp, 10) + entryMap[sdjournal.SD_JOURNAL_FIELD_MONOTONIC_TIMESTAMP] = strconv.FormatUint(entry.MonotonicTimestamp, 10) + records = append(records, &pb.JournalReply{ + Response: &pb.JournalReply_JournalRaw{ + JournalRaw: journalRecordRaw, + }, + }) + case "": + // default format + journalRecord := &pb.JournalRecord{} + journalRecord.RealtimeTimestamp = realtime + journalRecord.Hostname = entry.Fields[sdjournal.SD_JOURNAL_FIELD_HOSTNAME] + journalRecord.SyslogIdentifier = entry.Fields[sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER] + journalRecord.Message = entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE] + + // some log entries may not have pid, since they are not generated by a process + if pidStr, ok := entry.Fields[sdjournal.SD_JOURNAL_FIELD_PID]; ok { + pid, err := strconv.Atoi(pidStr) + if err != nil { + return nil, status.Errorf(codes.Internal, "sdjournal pid converts error: %v from string to int32", err) + } + journalRecord.Pid = int32(pid) + } + + // augument the records + // can only add explanatory text for those log entries that have `MESSAGE_ID` field + if _, ok := entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE_ID]; ok && req.Explain { + catalog, err := journal.GetCatalog() + if err != nil { + return nil, status.Errorf(codes.Internal, "sdjournal getCatalog error: %v", err) + } + journalRecord.Catalog = catalog + } + records = append(records, &pb.JournalReply{ + Response: &pb.JournalReply_Journal{ + Journal: journalRecord, + }, + }) + } + } + return records, nil +} diff --git a/services/sysinfo/server/sysinfo_linux_test.go b/services/sysinfo/server/sysinfo_linux_test.go index 981650b7..6397360f 100644 --- a/services/sysinfo/server/sysinfo_linux_test.go +++ b/services/sysinfo/server/sysinfo_linux_test.go @@ -21,11 +21,14 @@ package server import ( "context" + "encoding/json" "fmt" "io" "log" "net" "os" + "strconv" + "strings" "testing" "time" @@ -39,6 +42,7 @@ import ( pb "github.com/Snowflake-Labs/sansshell/services/sysinfo" "github.com/Snowflake-Labs/sansshell/testing/testutil" + "github.com/coreos/go-systemd/v22/sdjournal" "github.com/euank/go-kmsg-parser/v2/kmsgparser" ) @@ -418,3 +422,291 @@ func TestDmesg(t *testing.T) { }) } } + +func readJournalLog() ([]map[string]string, error) { + var result []map[string]string + testdataRaw := "./testdata/journal-log-entries.txt" + input, err := os.ReadFile(testdataRaw) + if err != nil { + return nil, err + } + lines := strings.Split(string(input), "\n") + for _, line := range lines { + if line == "" { + continue + } + var m map[string]string + if err := json.Unmarshal([]byte(line), &m); err != nil { + return nil, err + } + result = append(result, m) + } + return result, nil +} + +type MockJournal struct { + result []map[string]string + pointer int +} + +// read all entries from a file +func (j *MockJournal) SeekTail() error { + // start from the end to read + j.pointer = len(j.result) + return nil +} +func (j *MockJournal) Close() error { return nil } + +func (j *MockJournal) Previous() (uint64, error) { + remainingEntries := j.pointer + j.pointer-- + return uint64(remainingEntries), nil +} + +func (j *MockJournal) GetEntry() (*sdjournal.JournalEntry, error) { + rawMap := j.result[j.pointer] + realTimestamp, err := strconv.ParseUint(rawMap["__REALTIME_TIMESTAMP"], 10, 64) + if err != nil { + return nil, err + } + monotonicTimestamp, err := strconv.ParseUint(rawMap["__MONOTONIC_TIMESTAMP"], 10, 64) + if err != nil { + return nil, err + } + // copy rawMap + rawMapCopy := make(map[string]string) + for k, v := range rawMap { + rawMapCopy[k] = v + } + delete(rawMapCopy, "__CURSOR") + delete(rawMapCopy, "__REALTIME_TIMESTAMP") + delete(rawMapCopy, "__MONOTONIC_TIMESTAMP") + entry := &sdjournal.JournalEntry{ + Cursor: rawMap["__CURSOR"], + RealtimeTimestamp: realTimestamp, + MonotonicTimestamp: monotonicTimestamp, + Fields: rawMapCopy, + } + return entry, nil +} +func (j *MockJournal) GetCatalog() (string, error) { return "Subject: Unit YYYYY has begun XXXX", nil } + +func getJournalReply(raw map[string]string, wantRaw bool, wantCatalog bool, t *testing.T) *pb.JournalReply { + if wantRaw { + return &pb.JournalReply{ + Response: &pb.JournalReply_JournalRaw{ + JournalRaw: &pb.JournalRecordRaw{ + Entry: raw, + }, + }, + } + } + timestampInt, err := strconv.ParseInt(raw["__REALTIME_TIMESTAMP"], 10, 64) + testutil.FatalOnErr("Failed to convert realtimestamp from string to int64", err, t) + realtime := timestamppb.New(time.Unix(0, timestampInt*int64(time.Microsecond))) + pidInt, err := strconv.ParseInt(raw["_PID"], 10, 32) + testutil.FatalOnErr("Failed to convert pid from string to int32", err, t) + reply := &pb.JournalRecord{ + RealtimeTimestamp: realtime, + Hostname: raw["_HOSTNAME"], + SyslogIdentifier: raw["SYSLOG_IDENTIFIER"], + Pid: int32(pidInt), + Message: raw["MESSAGE"], + } + + if _, ok := raw["MESSAGE_ID"]; ok && wantCatalog { + reply.Catalog = "Subject: Unit YYYYY has begun XXXX" + } + + return &pb.JournalReply{ + Response: &pb.JournalReply_Journal{ + Journal: reply, + }, + } +} + +func TestJournal(t *testing.T) { + var err error + ctx := context.Background() + conn, err = grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithTransportCredentials(insecure.NewCredentials())) + testutil.FatalOnErr("Failed to dial bufnet", err, t) + t.Cleanup(func() { conn.Close() }) + + client := pb.NewSysInfoClient(conn) + + // prepare the data + rawDataList, err := readJournalLog() + testutil.FatalOnErr("Failed to read raw journal data", err, t) + // process from input time to timestamppb + startTime := "2023-07-27 17:19:00" + endTime := "2023-07-27 17:20:00" + expectedTimeFormat := "2006-01-02 15:04:05" + loc, err := time.LoadLocation("Local") + testutil.FatalOnErr("Failed to load local location", err, t) + sinceTime, err := time.ParseInLocation(expectedTimeFormat, startTime, loc) + testutil.FatalOnErr("Failed to convert since time in specified time zone", err, t) + untilTime, err := time.ParseInLocation(expectedTimeFormat, endTime, loc) + testutil.FatalOnErr("Failed to convert until time in specified time zone", err, t) + sinceTimeTimestamp := timestamppb.New(sinceTime) + untiltimeTimestamp := timestamppb.New(untilTime) + + savedGetJournalRecords := getJournalRecords + // create mockJournal + getJournal = func() (journal, error) { + mockJournal := &MockJournal{ + result: rawDataList, + } + return mockJournal, nil + } + + // test in Linux env + getJournalRecords = func(req *pb.JournalRequest) ([]*pb.JournalReply, error) { + records, err := savedGetJournalRecords(req) + if err != nil { + return nil, err + } + return records, nil + } + + for _, tc := range []struct { + name string + req *pb.JournalRequest + isExplain bool + isOutputJson bool + wantReply []*pb.JournalReply + wantErr bool + }{ + { + name: "fetch all journal entries: journalctl -tail -1", + req: &pb.JournalRequest{ + TailLine: -1, + }, + wantReply: []*pb.JournalReply{ + getJournalReply(rawDataList[0], false, false, t), + getJournalReply(rawDataList[1], false, false, t), + getJournalReply(rawDataList[2], false, false, t), + getJournalReply(rawDataList[3], false, false, t), + }, + }, + { + name: "fetch the latest one journal entry: journalctl -tail 1", + req: &pb.JournalRequest{ + TailLine: 1, + }, + wantReply: []*pb.JournalReply{ + getJournalReply(rawDataList[3], false, false, t), + }, + }, + { + name: "filter journal entries with systemd unit boot.mount and augument text: journalctl -x -unit boot.mount", + req: &pb.JournalRequest{ + TailLine: -1, + Explain: true, + Unit: "boot.mount", + }, + isExplain: true, + wantReply: []*pb.JournalReply{ + getJournalReply(rawDataList[0], false, true, t), + getJournalReply(rawDataList[1], false, true, t), + }, + }, + { + name: "filter journal entries based on time : journalctl --since '2023-07-27 17:19:00' --until '2023-07-27 17:20:00'", + req: &pb.JournalRequest{ + TailLine: -1, + TimeSince: sinceTimeTimestamp, + TimeUntil: untiltimeTimestamp, + }, + isExplain: true, + wantReply: []*pb.JournalReply{ + getJournalReply(rawDataList[2], false, false, t), + }, + }, + { + name: "fetch all journal entries: journalctl -tail -1", + req: &pb.JournalRequest{ + TailLine: -1, + }, + wantReply: []*pb.JournalReply{ + getJournalReply(rawDataList[0], false, false, t), + getJournalReply(rawDataList[1], false, false, t), + getJournalReply(rawDataList[2], false, false, t), + getJournalReply(rawDataList[3], false, false, t), + }, + }, + { + name: "fetch latest entry with json format: journalctl -tail 1 -output json", + req: &pb.JournalRequest{ + TailLine: 1, + Output: "json", + }, + wantReply: []*pb.JournalReply{ + getJournalReply(rawDataList[3], true, false, t), + }, + }, + { + name: "bad output input: journalctl -tail 1 -output XXX", + req: &pb.JournalRequest{ + TailLine: 1, + Output: "XXX", + }, + wantErr: true, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + stream, err := client.Journal(ctx, tc.req) + testutil.FatalOnErr("Journal failed", err, t) + var gotRecords []*pb.JournalReply + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + testutil.WantErr(tc.name, err, tc.wantErr, t) + if tc.wantErr { + // If this was an expected error we're done. + return + } + gotRecords = append(gotRecords, resp) + + } + // no matter the reply is JournalReply_Journal or JournalReply_JournalRaw, just compare them + testutil.DiffErr(tc.name, gotRecords, tc.wantReply, t) + }) + } + + // journal is not supported in other OS, so an error should be raised + getJournalRecords = func(req *pb.JournalRequest) ([]*pb.JournalReply, error) { + return nil, status.Errorf(codes.Unimplemented, "journal is not supported") + } + t.Cleanup(func() { getJournalRecords = savedGetJournalRecords }) + for _, tc := range []struct { + name string + req *pb.JournalRequest + wantErr bool + }{ + { + name: "journal action not supported in other OS except Linux right now", + req: &pb.JournalRequest{}, + wantErr: true, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + stream, _ := client.Journal(ctx, tc.req) + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + testutil.WantErr(tc.name, err, tc.wantErr, t) + if tc.wantErr { + // If this was an expected error we're done. + return + } + + } + }) + } +} diff --git a/services/sysinfo/server/testdata/journal-log-entries.txt b/services/sysinfo/server/testdata/journal-log-entries.txt new file mode 100644 index 00000000..214560e8 --- /dev/null +++ b/services/sysinfo/server/testdata/journal-log-entries.txt @@ -0,0 +1,4 @@ +{"CODE_FILE":"src/core/unit.c","CODE_FUNCTION":"unit_status_log_starting_stopping_reloading","CODE_LINE":"1439","MESSAGE":"Mounting /boot...","MESSAGE_ID":"7d4958e842da4a758f6c1cdc7b36dcc5","PRIORITY":"6","SYSLOG_FACILITY":"3","SYSLOG_IDENTIFIER":"systemd","UNIT":"boot.mount","_BOOT_ID":"41248570a6c54ec9b843acbc7fba97e0","_CAP_EFFECTIVE":"1ffffffffff","_CMDLINE":"/usr/lib/systemd/systemd --switched-root --system --deserialize 22","_COMM":"systemd","_EXE":"/usr/lib/systemd/systemd","_GID":"0","_HOSTNAME":"localhost.localdomain","_MACHINE_ID":"6cb5ed28584f4c4d8cdad5b612b94ab7","_PID":"1","_SOURCE_REALTIME_TIMESTAMP":"1687391641745520","_SYSTEMD_CGROUP":"/","_TRANSPORT":"journal","_UID":"0","__CURSOR":"s=6c11b383c7f24cc6995cab9e3a6303e1;i=261;b=41248570a6c54ec9b843acbc7fba97e0;m=44b23d;t=5feac7b6738de;x=a031ed549350a94","__MONOTONIC_TIMESTAMP":"4502077","__REALTIME_TIMESTAMP":"1687391641745630"} +{"CODE_FILE":"src/core/job.c","CODE_FUNCTION":"job_log_status_message","CODE_LINE":"774","MESSAGE":"Mounted /boot.","MESSAGE_ID":"39f53479d3a045ac8e11786248231fbf","PRIORITY":"6","RESULT":"done","SYSLOG_FACILITY":"3","SYSLOG_IDENTIFIER":"systemd","UNIT":"boot.mount","_BOOT_ID":"41248570a6c54ec9b843acbc7fba97e0","_CAP_EFFECTIVE":"1ffffffffff","_CMDLINE":"/usr/lib/systemd/systemd --switched-root --system --deserialize 22","_COMM":"systemd","_EXE":"/usr/lib/systemd/systemd","_GID":"0","_HOSTNAME":"localhost.localdomain","_MACHINE_ID":"6cb5ed28584f4c4d8cdad5b612b94ab7","_PID":"1","_SOURCE_REALTIME_TIMESTAMP":"1687391641756378","_SYSTEMD_CGROUP":"/","_TRANSPORT":"journal","_UID":"0","__CURSOR":"s=6c11b383c7f24cc6995cab9e3a6303e1;i=265;b=41248570a6c54ec9b843acbc7fba97e0;m=44dcc2;t=5feac7b676362;x=91f03280d2675f45","__MONOTONIC_TIMESTAMP":"4512962","__REALTIME_TIMESTAMP":"1687391641756514"} +{"MESSAGE":"Jul 27 17:19:00 1504 INFO] Sleeping","PRIORITY":"6","SYSLOG_FACILITY":"3","SYSLOG_IDENTIFIER":"python3","_BOOT_ID":"41248570a6c54ec9b843acbc7fba97e0","_CAP_EFFECTIVE":"0","_CMDLINE":"/usr/local/lib/nca/venv_nca/bin/python3 /usr/local/lib/nca/nca.py --src /var/local/nca/incoming --backup /var/local/nca/consumed --template /usr/local/lib/nca/nginx_templates/snowflake.tmpl --dst /etc/nginx/nca_deployments --domain=.local --verbose","_COMM":"python3","_EXE":"/usr/bin/python3.6","_GID":"975","_HOSTNAME":"SDP_DevVM-pchu","_MACHINE_ID":"6cb5ed28584f4c4d8cdad5b612b94ab7","_PID":"1504","_STREAM_ID":"0dd9036be5f44f06963966114ccff956","_SYSTEMD_CGROUP":"/system.slice/nca.service","_SYSTEMD_SLICE":"system.slice","_SYSTEMD_UNIT":"nca.service","_TRANSPORT":"stdout","_UID":"984","__CURSOR":"s=6c11b383c7f24cc6995cab9e3a6303e1;i=1ca02;b=41248570a6c54ec9b843acbc7fba97e0;m=10e2ecb4920;t=6017b28f61707;x=fdc50e68d98a08b1","__MONOTONIC_TIMESTAMP":"1160426244384","__REALTIME_TIMESTAMP":"1690478340085511"} +{"MESSAGE":"Selected source 162.159.200.1","PRIORITY":"6","SYSLOG_FACILITY":"3","SYSLOG_IDENTIFIER":"chronyd","SYSLOG_PID":"748","_BOOT_ID":"41248570a6c54ec9b843acbc7fba97e0","_CAP_EFFECTIVE":"2000400","_CMDLINE":"/usr/sbin/chronyd","_COMM":"chronyd","_EXE":"/usr/sbin/chronyd","_GID":"989","_HOSTNAME":"SDP_DevVM-pchu","_MACHINE_ID":"6cb5ed28584f4c4d8cdad5b612b94ab7","_PID":"748","_SOURCE_REALTIME_TIMESTAMP":"1690478406262876","_SYSTEMD_CGROUP":"/system.slice/chronyd.service","_SYSTEMD_SLICE":"system.slice","_SYSTEMD_UNIT":"chronyd.service","_TRANSPORT":"syslog","_UID":"993","__CURSOR":"s=6c11b383c7f24cc6995cab9e3a6303e1;i=1ca06;b=41248570a6c54ec9b843acbc7fba97e0;m=10e32bd1520;t=6017b2ce7e307;x=f642c77493bf0037","__MONOTONIC_TIMESTAMP":"1160492422432","__REALTIME_TIMESTAMP":"1690478406263559"} \ No newline at end of file diff --git a/services/sysinfo/sysinfo.pb.go b/services/sysinfo/sysinfo.pb.go index 7d50a9a9..469bf047 100644 --- a/services/sysinfo/sysinfo.pb.go +++ b/services/sysinfo/sysinfo.pb.go @@ -263,6 +263,323 @@ func (x *DmsgRecord) GetMessage() string { return "" } +type JournalRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The start time for query, timestamp format will be YYYY-MM-DD HH:MM:SS + TimeSince *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=time_since,json=timeSince,proto3" json:"time_since,omitempty"` + // The end time for query + TimeUntil *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=time_until,json=timeUntil,proto3" json:"time_until,omitempty"` + // Tail the latest number of log entries from the journal, negative means display all messages + TailLine int32 `protobuf:"varint,3,opt,name=tail_line,json=tailLine,proto3" json:"tail_line,omitempty"` + // Filter messages for specified systemd units + Unit string `protobuf:"bytes,4,opt,name=unit,proto3" json:"unit,omitempty"` + // Controls the format of the journal entries + Output string `protobuf:"bytes,5,opt,name=output,proto3" json:"output,omitempty"` + // If true, add explanatory help texts to log messages in the output + // default value is false + Explain bool `protobuf:"varint,6,opt,name=explain,proto3" json:"explain,omitempty"` +} + +func (x *JournalRequest) Reset() { + *x = JournalRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_sysinfo_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *JournalRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JournalRequest) ProtoMessage() {} + +func (x *JournalRequest) ProtoReflect() protoreflect.Message { + mi := &file_sysinfo_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JournalRequest.ProtoReflect.Descriptor instead. +func (*JournalRequest) Descriptor() ([]byte, []int) { + return file_sysinfo_proto_rawDescGZIP(), []int{4} +} + +func (x *JournalRequest) GetTimeSince() *timestamppb.Timestamp { + if x != nil { + return x.TimeSince + } + return nil +} + +func (x *JournalRequest) GetTimeUntil() *timestamppb.Timestamp { + if x != nil { + return x.TimeUntil + } + return nil +} + +func (x *JournalRequest) GetTailLine() int32 { + if x != nil { + return x.TailLine + } + return 0 +} + +func (x *JournalRequest) GetUnit() string { + if x != nil { + return x.Unit + } + return "" +} + +func (x *JournalRequest) GetOutput() string { + if x != nil { + return x.Output + } + return "" +} + +func (x *JournalRequest) GetExplain() bool { + if x != nil { + return x.Explain + } + return false +} + +type JournalReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Response: + // + // *JournalReply_Journal + // *JournalReply_JournalRaw + Response isJournalReply_Response `protobuf_oneof:"response"` +} + +func (x *JournalReply) Reset() { + *x = JournalReply{} + if protoimpl.UnsafeEnabled { + mi := &file_sysinfo_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *JournalReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JournalReply) ProtoMessage() {} + +func (x *JournalReply) ProtoReflect() protoreflect.Message { + mi := &file_sysinfo_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JournalReply.ProtoReflect.Descriptor instead. +func (*JournalReply) Descriptor() ([]byte, []int) { + return file_sysinfo_proto_rawDescGZIP(), []int{5} +} + +func (m *JournalReply) GetResponse() isJournalReply_Response { + if m != nil { + return m.Response + } + return nil +} + +func (x *JournalReply) GetJournal() *JournalRecord { + if x, ok := x.GetResponse().(*JournalReply_Journal); ok { + return x.Journal + } + return nil +} + +func (x *JournalReply) GetJournalRaw() *JournalRecordRaw { + if x, ok := x.GetResponse().(*JournalReply_JournalRaw); ok { + return x.JournalRaw + } + return nil +} + +type isJournalReply_Response interface { + isJournalReply_Response() +} + +type JournalReply_Journal struct { + Journal *JournalRecord `protobuf:"bytes,1,opt,name=journal,proto3,oneof"` +} + +type JournalReply_JournalRaw struct { + JournalRaw *JournalRecordRaw `protobuf:"bytes,2,opt,name=journalRaw,proto3,oneof"` +} + +func (*JournalReply_Journal) isJournalReply_Response() {} + +func (*JournalReply_JournalRaw) isJournalReply_Response() {} + +// journal record is the default format +// and contains fields the same as journalctl output in linux +type JournalRecord struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RealtimeTimestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=realtime_timestamp,json=realtimeTimestamp,proto3" json:"realtime_timestamp,omitempty"` + Hostname string `protobuf:"bytes,2,opt,name=hostname,proto3" json:"hostname,omitempty"` + SyslogIdentifier string `protobuf:"bytes,3,opt,name=syslog_identifier,json=syslogIdentifier,proto3" json:"syslog_identifier,omitempty"` + Pid int32 `protobuf:"varint,4,opt,name=pid,proto3" json:"pid,omitempty"` + Message string `protobuf:"bytes,5,opt,name=message,proto3" json:"message,omitempty"` + // contains explanatory texts if exist and explain is set to true + Catalog string `protobuf:"bytes,6,opt,name=catalog,proto3" json:"catalog,omitempty"` +} + +func (x *JournalRecord) Reset() { + *x = JournalRecord{} + if protoimpl.UnsafeEnabled { + mi := &file_sysinfo_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *JournalRecord) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JournalRecord) ProtoMessage() {} + +func (x *JournalRecord) ProtoReflect() protoreflect.Message { + mi := &file_sysinfo_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JournalRecord.ProtoReflect.Descriptor instead. +func (*JournalRecord) Descriptor() ([]byte, []int) { + return file_sysinfo_proto_rawDescGZIP(), []int{6} +} + +func (x *JournalRecord) GetRealtimeTimestamp() *timestamppb.Timestamp { + if x != nil { + return x.RealtimeTimestamp + } + return nil +} + +func (x *JournalRecord) GetHostname() string { + if x != nil { + return x.Hostname + } + return "" +} + +func (x *JournalRecord) GetSyslogIdentifier() string { + if x != nil { + return x.SyslogIdentifier + } + return "" +} + +func (x *JournalRecord) GetPid() int32 { + if x != nil { + return x.Pid + } + return 0 +} + +func (x *JournalRecord) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +func (x *JournalRecord) GetCatalog() string { + if x != nil { + return x.Catalog + } + return "" +} + +// raw journal record will contain most fields +// and display in specified output format +type JournalRecordRaw struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // when output is set to json or json-pretty + // a list of key-value pairs will be set here + // the key-value pairs will be different with different messages + Entry map[string]string `protobuf:"bytes,1,rep,name=entry,proto3" json:"entry,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *JournalRecordRaw) Reset() { + *x = JournalRecordRaw{} + if protoimpl.UnsafeEnabled { + mi := &file_sysinfo_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *JournalRecordRaw) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JournalRecordRaw) ProtoMessage() {} + +func (x *JournalRecordRaw) ProtoReflect() protoreflect.Message { + mi := &file_sysinfo_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JournalRecordRaw.ProtoReflect.Descriptor instead. +func (*JournalRecordRaw) Descriptor() ([]byte, []int) { + return file_sysinfo_proto_rawDescGZIP(), []int{7} +} + +func (x *JournalRecordRaw) GetEntry() map[string]string { + if x != nil { + return x.Entry + } + return nil +} + var File_sysinfo_proto protoreflect.FileDescriptor var file_sysinfo_proto_rawDesc = []byte{ @@ -296,18 +613,70 @@ var file_sysinfo_proto_rawDesc = []byte{ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x32, 0x7c, 0x0a, 0x07, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x38, 0x0a, - 0x06, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, - 0x14, 0x2e, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65, - 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x05, 0x44, 0x6d, 0x65, 0x73, 0x67, - 0x12, 0x15, 0x2e, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x44, 0x6d, 0x65, 0x73, 0x67, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, - 0x6f, 0x2e, 0x44, 0x6d, 0x65, 0x73, 0x67, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x30, 0x01, - 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x53, - 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x73, 0x61, - 0x6e, 0x73, 0x73, 0x68, 0x65, 0x6c, 0x6c, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, - 0x2f, 0x73, 0x79, 0x73, 0x69, 0x6e, 0x66, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x67, 0x65, 0x22, 0xe9, 0x01, 0x0a, 0x0e, 0x4a, 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x69, + 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x53, 0x69, 0x6e, 0x63, 0x65, + 0x12, 0x39, 0x0a, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x75, 0x6e, 0x74, 0x69, 0x6c, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x55, 0x6e, 0x74, 0x69, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x74, + 0x61, 0x69, 0x6c, 0x5f, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, + 0x74, 0x61, 0x69, 0x6c, 0x4c, 0x69, 0x6e, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x75, 0x6e, 0x69, 0x74, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x6e, 0x69, 0x74, 0x12, 0x16, 0x0a, 0x06, + 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x75, + 0x74, 0x70, 0x75, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x18, + 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x22, 0x8b, + 0x01, 0x0a, 0x0c, 0x4a, 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, + 0x32, 0x0a, 0x07, 0x6a, 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x16, 0x2e, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x4a, 0x6f, 0x75, 0x72, 0x6e, + 0x61, 0x6c, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x48, 0x00, 0x52, 0x07, 0x6a, 0x6f, 0x75, 0x72, + 0x6e, 0x61, 0x6c, 0x12, 0x3b, 0x0a, 0x0a, 0x6a, 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x52, 0x61, + 0x77, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, + 0x6f, 0x2e, 0x4a, 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, + 0x61, 0x77, 0x48, 0x00, 0x52, 0x0a, 0x6a, 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x52, 0x61, 0x77, + 0x42, 0x0a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xe9, 0x01, 0x0a, + 0x0d, 0x4a, 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x49, + 0x0a, 0x12, 0x72, 0x65, 0x61, 0x6c, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x11, 0x72, 0x65, 0x61, 0x6c, 0x74, 0x69, 0x6d, 0x65, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x1a, 0x0a, 0x08, 0x68, 0x6f, 0x73, + 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x68, 0x6f, 0x73, + 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x2b, 0x0a, 0x11, 0x73, 0x79, 0x73, 0x6c, 0x6f, 0x67, 0x5f, + 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x10, 0x73, 0x79, 0x73, 0x6c, 0x6f, 0x67, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, + 0x65, 0x72, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x03, 0x70, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, + 0x0a, 0x07, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x22, 0x88, 0x01, 0x0a, 0x10, 0x4a, 0x6f, 0x75, + 0x72, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x61, 0x77, 0x12, 0x3a, 0x0a, + 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x53, + 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x4a, 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x52, 0x65, + 0x63, 0x6f, 0x72, 0x64, 0x52, 0x61, 0x77, 0x2e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x45, 0x6e, 0x74, + 0x72, 0x79, 0x52, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x1a, 0x38, 0x0a, 0x0a, 0x45, 0x6e, 0x74, + 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, + 0x02, 0x38, 0x01, 0x32, 0xbb, 0x01, 0x0a, 0x07, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x12, + 0x38, 0x0a, 0x06, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x1a, 0x14, 0x2e, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x55, 0x70, 0x74, 0x69, + 0x6d, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x05, 0x44, 0x6d, 0x65, + 0x73, 0x67, 0x12, 0x15, 0x2e, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x44, 0x6d, 0x65, + 0x73, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x53, 0x79, 0x73, 0x49, + 0x6e, 0x66, 0x6f, 0x2e, 0x44, 0x6d, 0x65, 0x73, 0x67, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, + 0x30, 0x01, 0x12, 0x3d, 0x0a, 0x07, 0x4a, 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x12, 0x17, 0x2e, + 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x4a, 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, + 0x2e, 0x4a, 0x6f, 0x75, 0x72, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x30, + 0x01, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x53, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x73, + 0x61, 0x6e, 0x73, 0x73, 0x68, 0x65, 0x6c, 0x6c, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x73, 0x2f, 0x73, 0x79, 0x73, 0x69, 0x6e, 0x66, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -322,29 +691,42 @@ func file_sysinfo_proto_rawDescGZIP() []byte { return file_sysinfo_proto_rawDescData } -var file_sysinfo_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_sysinfo_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_sysinfo_proto_goTypes = []interface{}{ (*UptimeReply)(nil), // 0: SysInfo.UptimeReply (*DmesgRequest)(nil), // 1: SysInfo.DmesgRequest (*DmesgReply)(nil), // 2: SysInfo.DmesgReply (*DmsgRecord)(nil), // 3: SysInfo.DmsgRecord - (*durationpb.Duration)(nil), // 4: google.protobuf.Duration - (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 6: google.protobuf.Empty + (*JournalRequest)(nil), // 4: SysInfo.JournalRequest + (*JournalReply)(nil), // 5: SysInfo.JournalReply + (*JournalRecord)(nil), // 6: SysInfo.JournalRecord + (*JournalRecordRaw)(nil), // 7: SysInfo.JournalRecordRaw + nil, // 8: SysInfo.JournalRecordRaw.EntryEntry + (*durationpb.Duration)(nil), // 9: google.protobuf.Duration + (*timestamppb.Timestamp)(nil), // 10: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 11: google.protobuf.Empty } var file_sysinfo_proto_depIdxs = []int32{ - 4, // 0: SysInfo.UptimeReply.uptime_seconds:type_name -> google.protobuf.Duration - 3, // 1: SysInfo.DmesgReply.record:type_name -> SysInfo.DmsgRecord - 5, // 2: SysInfo.DmsgRecord.time:type_name -> google.protobuf.Timestamp - 6, // 3: SysInfo.SysInfo.Uptime:input_type -> google.protobuf.Empty - 1, // 4: SysInfo.SysInfo.Dmesg:input_type -> SysInfo.DmesgRequest - 0, // 5: SysInfo.SysInfo.Uptime:output_type -> SysInfo.UptimeReply - 2, // 6: SysInfo.SysInfo.Dmesg:output_type -> SysInfo.DmesgReply - 5, // [5:7] is the sub-list for method output_type - 3, // [3:5] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 9, // 0: SysInfo.UptimeReply.uptime_seconds:type_name -> google.protobuf.Duration + 3, // 1: SysInfo.DmesgReply.record:type_name -> SysInfo.DmsgRecord + 10, // 2: SysInfo.DmsgRecord.time:type_name -> google.protobuf.Timestamp + 10, // 3: SysInfo.JournalRequest.time_since:type_name -> google.protobuf.Timestamp + 10, // 4: SysInfo.JournalRequest.time_until:type_name -> google.protobuf.Timestamp + 6, // 5: SysInfo.JournalReply.journal:type_name -> SysInfo.JournalRecord + 7, // 6: SysInfo.JournalReply.journalRaw:type_name -> SysInfo.JournalRecordRaw + 10, // 7: SysInfo.JournalRecord.realtime_timestamp:type_name -> google.protobuf.Timestamp + 8, // 8: SysInfo.JournalRecordRaw.entry:type_name -> SysInfo.JournalRecordRaw.EntryEntry + 11, // 9: SysInfo.SysInfo.Uptime:input_type -> google.protobuf.Empty + 1, // 10: SysInfo.SysInfo.Dmesg:input_type -> SysInfo.DmesgRequest + 4, // 11: SysInfo.SysInfo.Journal:input_type -> SysInfo.JournalRequest + 0, // 12: SysInfo.SysInfo.Uptime:output_type -> SysInfo.UptimeReply + 2, // 13: SysInfo.SysInfo.Dmesg:output_type -> SysInfo.DmesgReply + 5, // 14: SysInfo.SysInfo.Journal:output_type -> SysInfo.JournalReply + 12, // [12:15] is the sub-list for method output_type + 9, // [9:12] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_sysinfo_proto_init() } @@ -401,6 +783,58 @@ func file_sysinfo_proto_init() { return nil } } + file_sysinfo_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*JournalRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_sysinfo_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*JournalReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_sysinfo_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*JournalRecord); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_sysinfo_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*JournalRecordRaw); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_sysinfo_proto_msgTypes[5].OneofWrappers = []interface{}{ + (*JournalReply_Journal)(nil), + (*JournalReply_JournalRaw)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -408,7 +842,7 @@ func file_sysinfo_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_sysinfo_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/services/sysinfo/sysinfo.proto b/services/sysinfo/sysinfo.proto index 84ae388c..a7210558 100644 --- a/services/sysinfo/sysinfo.proto +++ b/services/sysinfo/sysinfo.proto @@ -26,10 +26,11 @@ import "google/protobuf/timestamp.proto"; // The SysInfo service definition. service SysInfo { - // Uptime return + // Uptime return the system uptime since last shutdown or reboot rpc Uptime(google.protobuf.Empty) returns (UptimeReply) {} // display kernel-related messages rpc Dmesg(DmesgRequest) returns (stream DmesgReply) {} + rpc Journal(JournalRequest) returns (stream JournalReply) {} } message UptimeReply { @@ -56,3 +57,47 @@ message DmsgRecord{ google.protobuf.Timestamp time = 1; string message = 2; } + +message JournalRequest { + // The start time for query, timestamp format will be YYYY-MM-DD HH:MM:SS + google.protobuf.Timestamp time_since = 1; + // The end time for query + google.protobuf.Timestamp time_until = 2; + // Tail the latest number of log entries from the journal, negative means display all messages + int32 tail_line = 3; + // Filter messages for specified systemd units + string unit = 4; + // Controls the format of the journal entries + string output = 5; + // If true, add explanatory help texts to log messages in the output + // default value is false + bool explain = 6; +} + +message JournalReply { + oneof response { + JournalRecord journal = 1; + JournalRecordRaw journalRaw = 2; + } +} + +// journal record is the default format +// and contains fields the same as journalctl output in linux +message JournalRecord { + google.protobuf.Timestamp realtime_timestamp = 1; + string hostname = 2; + string syslog_identifier = 3; + int32 pid = 4; + string message = 5; + // contains explanatory texts if exist and explain is set to true + string catalog = 6; +} + +// raw journal record will contain most fields +// and display in specified output format +message JournalRecordRaw { + // when output is set to json or json-pretty + // a list of key-value pairs will be set here + // the key-value pairs will be different with different messages + map entry = 1; +} diff --git a/services/sysinfo/sysinfo_grpc.pb.go b/services/sysinfo/sysinfo_grpc.pb.go index fbe4e170..0a6888d0 100644 --- a/services/sysinfo/sysinfo_grpc.pb.go +++ b/services/sysinfo/sysinfo_grpc.pb.go @@ -35,18 +35,20 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - SysInfo_Uptime_FullMethodName = "/SysInfo.SysInfo/Uptime" - SysInfo_Dmesg_FullMethodName = "/SysInfo.SysInfo/Dmesg" + SysInfo_Uptime_FullMethodName = "/SysInfo.SysInfo/Uptime" + SysInfo_Dmesg_FullMethodName = "/SysInfo.SysInfo/Dmesg" + SysInfo_Journal_FullMethodName = "/SysInfo.SysInfo/Journal" ) // SysInfoClient is the client API for SysInfo service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type SysInfoClient interface { - // Uptime return + // Uptime return the system uptime since last shutdown or reboot Uptime(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*UptimeReply, error) // display kernel-related messages Dmesg(ctx context.Context, in *DmesgRequest, opts ...grpc.CallOption) (SysInfo_DmesgClient, error) + Journal(ctx context.Context, in *JournalRequest, opts ...grpc.CallOption) (SysInfo_JournalClient, error) } type sysInfoClient struct { @@ -98,14 +100,47 @@ func (x *sysInfoDmesgClient) Recv() (*DmesgReply, error) { return m, nil } +func (c *sysInfoClient) Journal(ctx context.Context, in *JournalRequest, opts ...grpc.CallOption) (SysInfo_JournalClient, error) { + stream, err := c.cc.NewStream(ctx, &SysInfo_ServiceDesc.Streams[1], SysInfo_Journal_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &sysInfoJournalClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type SysInfo_JournalClient interface { + Recv() (*JournalReply, error) + grpc.ClientStream +} + +type sysInfoJournalClient struct { + grpc.ClientStream +} + +func (x *sysInfoJournalClient) Recv() (*JournalReply, error) { + m := new(JournalReply) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // SysInfoServer is the server API for SysInfo service. // All implementations should embed UnimplementedSysInfoServer // for forward compatibility type SysInfoServer interface { - // Uptime return + // Uptime return the system uptime since last shutdown or reboot Uptime(context.Context, *emptypb.Empty) (*UptimeReply, error) // display kernel-related messages Dmesg(*DmesgRequest, SysInfo_DmesgServer) error + Journal(*JournalRequest, SysInfo_JournalServer) error } // UnimplementedSysInfoServer should be embedded to have forward compatible implementations. @@ -118,6 +153,9 @@ func (UnimplementedSysInfoServer) Uptime(context.Context, *emptypb.Empty) (*Upti func (UnimplementedSysInfoServer) Dmesg(*DmesgRequest, SysInfo_DmesgServer) error { return status.Errorf(codes.Unimplemented, "method Dmesg not implemented") } +func (UnimplementedSysInfoServer) Journal(*JournalRequest, SysInfo_JournalServer) error { + return status.Errorf(codes.Unimplemented, "method Journal not implemented") +} // UnsafeSysInfoServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to SysInfoServer will @@ -169,6 +207,27 @@ func (x *sysInfoDmesgServer) Send(m *DmesgReply) error { return x.ServerStream.SendMsg(m) } +func _SysInfo_Journal_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(JournalRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(SysInfoServer).Journal(m, &sysInfoJournalServer{stream}) +} + +type SysInfo_JournalServer interface { + Send(*JournalReply) error + grpc.ServerStream +} + +type sysInfoJournalServer struct { + grpc.ServerStream +} + +func (x *sysInfoJournalServer) Send(m *JournalReply) error { + return x.ServerStream.SendMsg(m) +} + // SysInfo_ServiceDesc is the grpc.ServiceDesc for SysInfo service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -187,6 +246,11 @@ var SysInfo_ServiceDesc = grpc.ServiceDesc{ Handler: _SysInfo_Dmesg_Handler, ServerStreams: true, }, + { + StreamName: "Journal", + Handler: _SysInfo_Journal_Handler, + ServerStreams: true, + }, }, Metadata: "sysinfo.proto", } diff --git a/services/sysinfo/sysinfo_grpcproxy.pb.go b/services/sysinfo/sysinfo_grpcproxy.pb.go index 7edfa61b..5aa9d47a 100644 --- a/services/sysinfo/sysinfo_grpcproxy.pb.go +++ b/services/sysinfo/sysinfo_grpcproxy.pb.go @@ -22,6 +22,7 @@ type SysInfoClientProxy interface { SysInfoClient UptimeOneMany(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (<-chan *UptimeManyResponse, error) DmesgOneMany(ctx context.Context, in *DmesgRequest, opts ...grpc.CallOption) (SysInfo_DmesgClientProxy, error) + JournalOneMany(ctx context.Context, in *JournalRequest, opts ...grpc.CallOption) (SysInfo_JournalClientProxy, error) } // Embed the original client inside of this so we get the other generated methods automatically. @@ -189,3 +190,91 @@ func (c *sysInfoClientProxy) DmesgOneMany(ctx context.Context, in *DmesgRequest, } return x, nil } + +// JournalManyResponse encapsulates a proxy data packet. +// It includes the target, index, response and possible error returned. +type JournalManyResponse struct { + Target string + // As targets can be duplicated this is the index into the slice passed to proxy.Conn. + Index int + Resp *JournalReply + Error error +} + +type SysInfo_JournalClientProxy interface { + Recv() ([]*JournalManyResponse, error) + grpc.ClientStream +} + +type sysInfoClientJournalClientProxy struct { + cc *proxy.Conn + directDone bool + grpc.ClientStream +} + +func (x *sysInfoClientJournalClientProxy) Recv() ([]*JournalManyResponse, error) { + var ret []*JournalManyResponse + // If this is a direct connection the RecvMsg call is to a standard grpc.ClientStream + // and not our proxy based one. This means we need to receive a typed response and + // convert it into a single slice entry return. This ensures the OneMany style calls + // can be used by proxy with 1:N targets and non proxy with 1 target without client changes. + if x.cc.Direct() { + // Check if we're done. Just return EOF now. Any real error was already sent inside + // of a ManyResponse. + if x.directDone { + return nil, io.EOF + } + m := &JournalReply{} + err := x.ClientStream.RecvMsg(m) + ret = append(ret, &JournalManyResponse{ + Resp: m, + Error: err, + Target: x.cc.Targets[0], + Index: 0, + }) + // An error means we're done so set things so a later call now gets an EOF. + if err != nil { + x.directDone = true + } + return ret, nil + } + + m := []*proxy.Ret{} + if err := x.ClientStream.RecvMsg(&m); err != nil { + return nil, err + } + for _, r := range m { + typedResp := &JournalManyResponse{ + Resp: &JournalReply{}, + } + typedResp.Target = r.Target + typedResp.Index = r.Index + typedResp.Error = r.Error + if r.Error == nil { + if err := r.Resp.UnmarshalTo(typedResp.Resp); err != nil { + typedResp.Error = fmt.Errorf("can't decode any response - %v. Original Error - %v", err, r.Error) + } + } + ret = append(ret, typedResp) + } + return ret, nil +} + +// JournalOneMany provides the same API as Journal but sends the same request to N destinations at once. +// N can be a single destination. +// +// NOTE: The returned channel must be read until it closes in order to avoid leaking goroutines. +func (c *sysInfoClientProxy) JournalOneMany(ctx context.Context, in *JournalRequest, opts ...grpc.CallOption) (SysInfo_JournalClientProxy, error) { + stream, err := c.cc.NewStream(ctx, &SysInfo_ServiceDesc.Streams[1], "/SysInfo.SysInfo/Journal", opts...) + if err != nil { + return nil, err + } + x := &sysInfoClientJournalClientProxy{c.cc.(*proxy.Conn), false, stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +}