Skip to content

Commit 6bb4674

Browse files
committed
feat: add gRPC server
1 parent 9cef4e6 commit 6bb4674

File tree

13 files changed

+452
-71
lines changed

13 files changed

+452
-71
lines changed

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ FROM gcr.io/distroless/base
1414
WORKDIR /
1515

1616
COPY --from=build-stage --chown=serve:serve /app/serve .
17+
COPY --from=build-stage --chown=serve:serve /app/config.yaml .
1718

1819
USER nonroot:nonroot
1920

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
install:
2+
brew install grpcurl
23
go install github.com/spf13/cobra-cli@latest
34

45
test:

README.md

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@ It encompasses documentation that helps extend this service and facilitates its
1111

1212
The decisions taken along the challenge will be documented [here](./docs/decisions.md)
1313

14+
## Flags
15+
16+
The configurable attributes `window in seconds` and `attribute key` can be configured either by changing the values on the `config.yaml` file or by passing flags via command line to the executable.
17+
Flags:
18+
- attrKey
19+
- windowSeconds
20+
Example:
21+
`go run main.go serve --attrKey=foo --windowSeconds=7`
22+
23+
The same command can be inferred either to be run via docker or via shell script.
24+
There are other flags that can be checked on `config.yaml` file
25+
1426
## How to run (via Golang)
1527

1628
To run the service you can either use Golang directly or use the helper Makefile, the commands for each of the approaches are below.
@@ -41,18 +53,26 @@ Have Docker installed
4153

4254
## How to run (via shell scripts)
4355

44-
## How to start the processor
56+
### How to start the processor
4557

4658
In order to start the server a helper file was created `run.sh`.runs the container with the application, when finished it removes the container.
4759

48-
## How to check the logs
60+
### How to check the logs
4961

5062
In order to check the logs of the application one can run `inspect.sh`. Once executed it will be continuously listen to the logs on the container.
5163

52-
## How to delete the resources created
64+
### How to delete the resources created
5365

5466
In order to delete the resources created a helper file was created `cleanup.sh`. This executable deletes the image.
5567

5668
### Prerequisites
5769

58-
Have Docker installed
70+
Have Docker installed
71+
72+
## Exercise the application
73+
74+
To exercise the application just run the server with the flags you wish.
75+
To run a request make sure you have brew installed and run `make install` to install a gRPC client (make sure you have brew installed).
76+
Otherwise, please, just use another client.
77+
Command example: `grpcurl -plaintext -d @ localhost:4317 opentelemetry.proto.collector.logs.v1.LogsService/Export < examples/log.json`
78+
Where you can change the address if it's different and provide different files.

cleanup.sh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#!/bin/bash
2-
set -e
3-
2+
docker stop log-processor
43
docker image rm log-processor
54
echo "Docker image 'log-processor' removed successfully."

cmd/root.go

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,24 @@
1-
/*
2-
Copyright © 2025 NAME HERE <EMAIL ADDRESS>
3-
4-
*/
51
package cmd
62

73
import (
4+
"errors"
85
"os"
6+
"strings"
97

108
"github.com/spf13/cobra"
9+
"github.com/spf13/viper"
1110
)
1211

12+
var cfgFile string
1313

14-
15-
// rootCmd represents the base command when called without any subcommands
1614
var rootCmd = &cobra.Command{
1715
Use: "oltp-log-processor",
18-
Short: "A brief description of your application",
19-
Long: `A longer description that spans multiple lines and likely contains
20-
examples and usage of using your application. For example:
21-
22-
Cobra is a CLI library for Go that empowers applications.
23-
This application is a tool to generate the needed files
24-
to quickly create a Cobra application.`,
25-
// Uncomment the following line if your bare application
26-
// has an action associated with it:
27-
// Run: func(cmd *cobra.Command, args []string) { },
16+
Short: "OLTP log processor",
17+
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
18+
return initializeConfig(cmd)
19+
},
2820
}
2921

30-
// Execute adds all child commands to the root command and sets flags appropriately.
31-
// This is called by main.main(). It only needs to happen once to the rootCmd.
3222
func Execute() {
3323
err := rootCmd.Execute()
3424
if err != nil {
@@ -37,15 +27,36 @@ func Execute() {
3727
}
3828

3929
func init() {
40-
// Here you will define your flags and configuration settings.
41-
// Cobra supports persistent flags, which, if defined here,
42-
// will be global for your application.
30+
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default locations: ., $HOME/.myapp/)")
31+
}
4332

44-
// rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.oltp-log-processor.yaml)")
33+
func initializeConfig(cmd *cobra.Command) error {
34+
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "*", "-", "*"))
35+
viper.AutomaticEnv()
4536

46-
// Cobra also supports local flags, which will only run
47-
// when this action is called directly.
48-
rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
49-
}
37+
if cfgFile != "" {
38+
viper.SetConfigFile(cfgFile)
39+
} else {
40+
home, err := os.UserHomeDir()
41+
cobra.CheckErr(err)
5042

43+
viper.AddConfigPath(".")
44+
viper.AddConfigPath(home)
45+
viper.SetConfigName("config")
46+
viper.SetConfigType("yaml")
47+
}
5148

49+
if err := viper.ReadInConfig(); err != nil {
50+
var configFileNotFoundError viper.ConfigFileNotFoundError
51+
if !errors.As(err, &configFileNotFoundError) {
52+
return err
53+
}
54+
}
55+
56+
err := viper.BindPFlags(cmd.Flags())
57+
if err != nil {
58+
return err
59+
}
60+
61+
return nil
62+
}

cmd/serve.go

Lines changed: 89 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,107 @@
1-
/*
2-
Copyright © 2025 NAME HERE <EMAIL ADDRESS>
3-
4-
*/
51
package cmd
62

73
import (
4+
"context"
85
"fmt"
6+
"log/slog"
7+
"net"
8+
"os"
9+
"os/signal"
10+
"pedroduarten9/oltp-log-processor/pkg/logs"
11+
"time"
912

1013
"github.com/spf13/cobra"
14+
"github.com/spf13/viper"
15+
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
16+
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
17+
"google.golang.org/grpc"
18+
"google.golang.org/grpc/credentials/insecure"
19+
"google.golang.org/grpc/reflection"
1120
)
1221

13-
// serveCmd represents the serve command
1422
var serveCmd = &cobra.Command{
1523
Use: "serve",
16-
Short: "A brief description of your command",
17-
Long: `A longer description that spans multiple lines and likely contains examples
18-
and usage of using your command. For example:
19-
20-
Cobra is a CLI library for Go that empowers applications.
21-
This application is a tool to generate the needed files
22-
to quickly create a Cobra application.`,
24+
Short: "A grpc server for the log service",
2325
Run: func(cmd *cobra.Command, args []string) {
24-
fmt.Println("serve called")
26+
windowSeconds := viper.GetInt("windowSeconds")
27+
maxReceiveMessageSize := viper.GetInt("maxReceiveMessageSize")
28+
attrKey := viper.GetString("attrKey")
29+
listenAddr := viper.GetString("listenAddr")
30+
logLevelCfg := viper.GetString("logLevel")
31+
32+
logLevel := *new(slog.Level)
33+
if err := logLevel.UnmarshalText([]byte(logLevelCfg)); err != nil {
34+
logLevel = slog.LevelInfo
35+
}
36+
logger := slog.New(slog.NewJSONHandler(
37+
os.Stdout,
38+
&slog.HandlerOptions{Level: logLevel}))
39+
slog.SetDefault(logger)
40+
41+
repo := logs.NewRepo()
42+
logProcessor := logs.NewLogProcessor(attrKey, repo)
43+
ctx, cancel := signal.NotifyContext(cmd.Context(), os.Interrupt, os.Kill)
44+
start(ctx, logProcessor, logger, windowSeconds)
45+
46+
slog.Debug("Starting listener", slog.String("listenAddr", listenAddr))
47+
listener, err := net.Listen("tcp", listenAddr)
48+
if err != nil {
49+
slog.Error("failed to create listener")
50+
return
51+
}
52+
53+
grpcServer := grpc.NewServer(
54+
grpc.StatsHandler(otelgrpc.NewServerHandler()),
55+
grpc.MaxRecvMsgSize(maxReceiveMessageSize),
56+
grpc.Creds(insecure.NewCredentials()),
57+
)
58+
collogspb.RegisterLogsServiceServer(grpcServer, logs.NewServer(logProcessor))
59+
60+
reflection.Register(grpcServer)
61+
slog.Debug("Starting gRPC server")
62+
63+
go func() {
64+
if err := grpcServer.Serve(listener); err != nil {
65+
cancel()
66+
}
67+
}()
68+
69+
<-ctx.Done()
70+
71+
slog.Debug("Stopping gRPC server")
72+
grpcServer.GracefulStop()
73+
74+
_ = listener.Close()
2575
},
2676
}
2777

2878
func init() {
2979
rootCmd.AddCommand(serveCmd)
80+
serveCmd.Flags().Int("windowSeconds", 30, "Window of logging of attributes in seconds")
81+
serveCmd.Flags().Int("maxReceiveMessageSize", 16777216, "Max receive message size for the gRPC handler")
82+
serveCmd.Flags().String("attrKey", "foo", "The attribute key to look for")
83+
serveCmd.Flags().String("listenAddr", "localhost:4317", "The listen address for the gRPC server")
84+
serveCmd.Flags().String("logLevel", "INFO", "The log level of the system")
85+
}
3086

31-
// Here you will define your flags and configuration settings.
32-
33-
// Cobra supports Persistent Flags which will work for this command
34-
// and all subcommands, e.g.:
35-
// serveCmd.PersistentFlags().String("foo", "", "A help for foo")
36-
37-
// Cobra supports local flags which will only run when this command
38-
// is called directly, e.g.:
39-
// serveCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
87+
func start(ctx context.Context, lp *logs.LogProcessor, logger *slog.Logger, windowSeconds int) {
88+
ticker := time.NewTicker(time.Duration(windowSeconds) * time.Second)
89+
go func() {
90+
for {
91+
select {
92+
case <-ticker.C:
93+
report := lp.ReportAndReset()
94+
if len(report) == 0 {
95+
logger.Info(fmt.Sprintf("No logs found for window (%d seconds).", windowSeconds))
96+
break
97+
}
98+
intro := fmt.Sprintf("Reporting counts for window (%d seconds):", windowSeconds)
99+
logger.Info(intro)
100+
logger.Info(report)
101+
case <-ctx.Done():
102+
ticker.Stop()
103+
return
104+
}
105+
}
106+
}()
40107
}

cmd/test/serve_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package test
2+
3+
import (
4+
"context"
5+
"log"
6+
"net"
7+
"pedroduarten9/oltp-log-processor/pkg/logs"
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
12+
otellogs "go.opentelemetry.io/proto/otlp/logs/v1"
13+
"google.golang.org/grpc"
14+
"google.golang.org/grpc/credentials/insecure"
15+
"google.golang.org/grpc/test/bufconn"
16+
)
17+
18+
func TestLogsServiceServer_Export(t *testing.T) {
19+
ctx := context.Background()
20+
21+
client, closer := server()
22+
defer closer()
23+
24+
type expectation struct {
25+
out *collogspb.ExportLogsServiceResponse
26+
err error
27+
}
28+
29+
tests := map[string]struct {
30+
in *collogspb.ExportLogsServiceRequest
31+
expected expectation
32+
}{
33+
"Must_Success": {
34+
in: &collogspb.ExportLogsServiceRequest{
35+
ResourceLogs: []*otellogs.ResourceLogs{
36+
{
37+
ScopeLogs: []*otellogs.ScopeLogs{},
38+
SchemaUrl: "dash0.com/otlp-log-processor-backend",
39+
},
40+
},
41+
},
42+
expected: expectation{
43+
out: &collogspb.ExportLogsServiceResponse{},
44+
err: nil,
45+
},
46+
},
47+
}
48+
49+
for scenario, tt := range tests {
50+
t.Run(scenario, func(t *testing.T) {
51+
out, err := client.Export(ctx, tt.in)
52+
if err != nil {
53+
assert.Equal(t, tt.expected.err, err)
54+
} else {
55+
expectedPartialSuccess := tt.expected.out.GetPartialSuccess()
56+
partialSuccess := out.GetPartialSuccess()
57+
assert.Equal(t, expectedPartialSuccess.GetRejectedLogRecords(), partialSuccess.GetRejectedLogRecords())
58+
assert.Equal(t, expectedPartialSuccess.GetErrorMessage(), partialSuccess.GetErrorMessage())
59+
}
60+
61+
})
62+
}
63+
}
64+
65+
func server() (collogspb.LogsServiceClient, func()) {
66+
addr := "localhost:4317"
67+
buffer := 101024 * 1024
68+
lis := bufconn.Listen(buffer)
69+
70+
var err error
71+
72+
baseServer := grpc.NewServer()
73+
logsProcessor := logs.NewLogProcessor("foo", logs.NewRepo())
74+
collogspb.RegisterLogsServiceServer(baseServer, logs.NewServer(logsProcessor))
75+
go func() {
76+
if err := baseServer.Serve(lis); err != nil {
77+
log.Printf("error serving server: %v", err)
78+
}
79+
}()
80+
81+
conn, err := grpc.NewClient(addr,
82+
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
83+
return lis.Dial()
84+
}), grpc.WithTransportCredentials(insecure.NewCredentials()))
85+
if err != nil {
86+
log.Printf("error connecting to server: %v", err)
87+
}
88+
89+
closer := func() {
90+
err := lis.Close()
91+
if err != nil {
92+
log.Printf("error closing listener: %v", err)
93+
}
94+
baseServer.Stop()
95+
}
96+
97+
client := collogspb.NewLogsServiceClient(conn)
98+
99+
return client, closer
100+
}

config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
listenAddr: "localhost:4317"
2+
maxReceiveMessageSize: 16777216
3+
windowSeconds: 5
4+
attrKey: "foo"
5+
logLevel: "DEBUG"

0 commit comments

Comments
 (0)