Skip to content

Commit 4013d53

Browse files
committed
feat: add repo and processor
1 parent 4dcaddc commit 4013d53

14 files changed

+743
-3
lines changed

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
install:
22
go install github.com/spf13/cobra-cli@latest
33

4+
test:
5+
go test -race ./...
6+
7+
bench:
8+
go test ./... -bench=.
9+
410
serve:
511
go run main.go serve

docs/decisions.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,20 @@ This document will outline the technical and product decisions taken along the c
66

77
### Usage of Cobra
88

9-
Cobra CLI is a CLI helper that makes helps with the running of the service, so I opted to use it.
9+
Cobra CLI is a CLI helper that makes helps with the running of the service, so I opted to use it.
10+
11+
### Usage of mocks
12+
13+
There are interfaces in case I wanted to mock requests, I saw no need for that since I can easily test the full flow without depending on external sources or flaky I/O. Therefore I decided not to use mocks.
14+
15+
### Support for log values
16+
17+
I opted to support multiple value types for the logs but I will only test for strings, otherwise it would take me much more time. I can also remove the support.
18+
19+
### Log processing response
20+
21+
For this iteration I can either find or not the key, I will not be processing errors, therefore the return is always the same.
22+
23+
### Concurrency
24+
25+
There are unused structs such as concurrent repo and concurrent service that use a synchronized map with sharding and go routines respectively, with the benchmarks result the decision was to keep the simpler versions.

go.mod

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,19 @@ module pedroduarten9/oltp-log-processor
33
go 1.23.0
44

55
require (
6+
github.com/orcaman/concurrent-map/v2 v2.0.1
67
github.com/spf13/cobra v1.10.1
8+
github.com/stretchr/testify v1.11.1
79
go.opentelemetry.io/proto/otlp v1.8.0
810
)
911

1012
require (
13+
github.com/davecgh/go-spew v1.1.1 // indirect
1114
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
1215
github.com/inconshreveable/mousetrap v1.1.0 // indirect
16+
github.com/kr/text v0.2.0 // indirect
17+
github.com/pmezard/go-difflib v1.0.0 // indirect
18+
github.com/rogpeppe/go-internal v1.14.1 // indirect
1319
github.com/spf13/pflag v1.0.9 // indirect
1420
golang.org/x/net v0.43.0 // indirect
1521
golang.org/x/sys v0.35.0 // indirect
@@ -18,4 +24,5 @@ require (
1824
google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect
1925
google.golang.org/grpc v1.75.0 // indirect
2026
google.golang.org/protobuf v1.36.8 // indirect
27+
gopkg.in/yaml.v3 v3.0.1 // indirect
2128
)

go.sum

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
2+
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
3+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
4+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
25
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
36
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
47
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
@@ -13,11 +16,23 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnV
1316
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs=
1417
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
1518
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
19+
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
20+
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
21+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
22+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
23+
github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c=
24+
github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM=
25+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
26+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
27+
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
28+
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
1629
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
1730
github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s=
1831
github.com/spf13/cobra v1.10.1/go.mod h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0=
1932
github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
2033
github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
34+
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
35+
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
2136
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
2237
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
2338
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
@@ -49,4 +64,7 @@ google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr
4964
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
5065
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
5166
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
67+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
68+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
69+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
5270
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

pkg/logs/concurrent_repo.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package logs
2+
3+
import cmap "github.com/orcaman/concurrent-map/v2"
4+
5+
type concurrentRepo struct {
6+
counts cmap.ConcurrentMap[string, int]
7+
}
8+
9+
func NewConcurrentRepo() *concurrentRepo {
10+
return &concurrentRepo{
11+
counts: cmap.New[int](),
12+
}
13+
}
14+
15+
func (r *concurrentRepo) IncrementAttribute(attr string) {
16+
val, _ := r.counts.Get(attr)
17+
r.counts.Set(attr, val+1)
18+
}
19+
20+
func (r *concurrentRepo) Reset() map[string]int {
21+
oldMap := r.counts
22+
r.counts = cmap.New[int]()
23+
return oldMap.Items()
24+
}

pkg/logs/concurrent_repo_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package logs
2+
3+
import (
4+
"strconv"
5+
"sync"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestConcurrentIncrementAttributeAndReset(t *testing.T) {
12+
repo := NewConcurrentRepo()
13+
14+
tests := []struct {
15+
name string
16+
attributesAndCount map[string]int
17+
}{
18+
{
19+
name: "single key",
20+
attributesAndCount: map[string]int{"key": 1},
21+
},
22+
{
23+
name: "single key multiple times",
24+
attributesAndCount: map[string]int{"key": 3},
25+
},
26+
{
27+
name: "multiple keys",
28+
attributesAndCount: map[string]int{"key": 3, "key2": 1, "key3": 4},
29+
},
30+
}
31+
32+
for _, tt := range tests {
33+
for k, v := range tt.attributesAndCount {
34+
for i := 0; i < v; i++ {
35+
repo.IncrementAttribute(k)
36+
}
37+
}
38+
39+
oldMap := repo.Reset()
40+
assert.Equal(t, tt.attributesAndCount, oldMap)
41+
assert.True(t, repo.counts.IsEmpty())
42+
}
43+
}
44+
45+
func BenchmarkConcurrentRepo_Serial(b *testing.B) {
46+
key := "foo"
47+
repo := NewConcurrentRepo()
48+
49+
b.ResetTimer()
50+
for i := 0; i < b.N; i++ {
51+
repo.IncrementAttribute(key)
52+
}
53+
}
54+
55+
func BenchmarkConcurrentRepo_Parallel(b *testing.B) {
56+
key := "foo"
57+
repo := NewConcurrentRepo()
58+
59+
b.ResetTimer()
60+
b.RunParallel(func(pb *testing.PB) {
61+
for pb.Next() {
62+
repo.IncrementAttribute(key)
63+
}
64+
})
65+
}
66+
67+
func BenchmarkConcurrentRepo_ManyKeys(b *testing.B) {
68+
repo := NewConcurrentRepo()
69+
70+
b.ResetTimer()
71+
for i := 0; i < b.N; i++ {
72+
val := strconv.Itoa(i % 1000)
73+
repo.IncrementAttribute(val)
74+
}
75+
}
76+
77+
func BenchmarkConcurrentRepo_WriteAndReset(b *testing.B) {
78+
repo := NewConcurrentRepo()
79+
80+
b.ResetTimer()
81+
for i := 0; i < b.N; i++ {
82+
val := strconv.Itoa(i % 1000)
83+
repo.IncrementAttribute(val)
84+
}
85+
86+
for i := 0; i < b.N; i++ {
87+
_ = repo.Reset()
88+
}
89+
}
90+
91+
func BenchmarkConcurrentRepo_ConcurrentWriteAndRead(b *testing.B) {
92+
key := "foo"
93+
repo := NewConcurrentRepo()
94+
95+
b.ResetTimer()
96+
var wg sync.WaitGroup
97+
for i := 0; i < b.N; i++ {
98+
wg.Add(2)
99+
go func() {
100+
repo.IncrementAttribute(key)
101+
wg.Done()
102+
}()
103+
go func() {
104+
_ = repo.Reset()
105+
wg.Done()
106+
}()
107+
}
108+
wg.Wait()
109+
}

pkg/logs/concurrent_service.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package logs
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
7+
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
8+
)
9+
10+
type dash0LogsConcurrentServiceServer struct {
11+
logProcessor *LogProcessor
12+
13+
collogspb.UnimplementedLogsServiceServer
14+
}
15+
16+
func NewConcurrentServer(logProcessor *LogProcessor) collogspb.LogsServiceServer {
17+
return &dash0LogsConcurrentServiceServer{
18+
logProcessor: logProcessor,
19+
}
20+
}
21+
22+
func (l *dash0LogsConcurrentServiceServer) Export(ctx context.Context, request *collogspb.ExportLogsServiceRequest) (*collogspb.ExportLogsServiceResponse, error) {
23+
slog.DebugContext(ctx, "Received ExportLogsServiceRequest")
24+
25+
for _, resourceLogs := range request.ResourceLogs {
26+
go l.logProcessor.ProcessLog(resourceLogs)
27+
}
28+
29+
return &collogspb.ExportLogsServiceResponse{}, nil
30+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package logs
2+
3+
import (
4+
"context"
5+
"testing"
6+
)
7+
8+
func BenchmarkConcurrentExport_9kBatch(b *testing.B) {
9+
key := "foo"
10+
ctx := context.Background()
11+
service := NewConcurrentServer(NewLogProcessor(key, NewRepo()))
12+
13+
for i := 0; i < b.N; i++ {
14+
logs := generateLog(key, 9000)
15+
service.Export(ctx, logs)
16+
}
17+
}

0 commit comments

Comments
 (0)