From eecb3a0476d81cbab149ce4dedac09e4161054c0 Mon Sep 17 00:00:00 2001 From: Sean Marciniak Date: Fri, 6 Dec 2024 11:13:54 +1030 Subject: [PATCH] Migrating s3 exporter to use aws sdk v2 This change migrates the exporter to use sdk v2 while also providing some code improvements. --- exporter/awss3exporter/config.go | 32 ++- exporter/awss3exporter/exporter.go | 24 ++- exporter/awss3exporter/exporter_test.go | 10 +- exporter/awss3exporter/factory.go | 6 +- exporter/awss3exporter/go.mod | 22 ++- exporter/awss3exporter/go.sum | 49 ++++- .../internal/upload/partition.go | 95 +++++++++ .../internal/upload/partition_test.go | 186 ++++++++++++++++++ .../awss3exporter/internal/upload/writer.go | 81 ++++++++ .../internal/upload/writer_test.go | 158 +++++++++++++++ exporter/awss3exporter/s3_writer.go | 158 +++++---------- exporter/awss3exporter/s3_writer_test.go | 180 +++++------------ 12 files changed, 723 insertions(+), 278 deletions(-) create mode 100644 exporter/awss3exporter/internal/upload/partition.go create mode 100644 exporter/awss3exporter/internal/upload/partition_test.go create mode 100644 exporter/awss3exporter/internal/upload/writer.go create mode 100644 exporter/awss3exporter/internal/upload/writer_test.go diff --git a/exporter/awss3exporter/config.go b/exporter/awss3exporter/config.go index 0db41271ff09..4514d2bb7643 100644 --- a/exporter/awss3exporter/config.go +++ b/exporter/awss3exporter/config.go @@ -14,16 +14,28 @@ import ( // S3UploaderConfig contains aws s3 uploader related config to controls things // like bucket, prefix, batching, connections, retries, etc. type S3UploaderConfig struct { - Region string `mapstructure:"region"` - S3Bucket string `mapstructure:"s3_bucket"` - S3Prefix string `mapstructure:"s3_prefix"` - S3Partition string `mapstructure:"s3_partition"` - FilePrefix string `mapstructure:"file_prefix"` - Endpoint string `mapstructure:"endpoint"` - RoleArn string `mapstructure:"role_arn"` - S3ForcePathStyle bool `mapstructure:"s3_force_path_style"` - DisableSSL bool `mapstructure:"disable_ssl"` - Compression configcompression.Type `mapstructure:"compression"` + Region string `mapstructure:"region"` + // S3Bucket is the bucket name to be uploaded to. + S3Bucket string `mapstructure:"s3_bucket"` + // S3Prefix is the key (directory) prefix to written to inside the bucket + S3Prefix string `mapstructure:"s3_prefix"` + // S3Partition is used to provide the rollup on how data is written. + // Valid values are: [hour,minute] + S3Partition string `mapstructure:"s3_partition"` + // FilePrefix is the filename prefix used for the file to avoid any potential collisions. + FilePrefix string `mapstructure:"file_prefix"` + // Endpoint is the URL used for communicated with S3. + Endpoint string `mapstructure:"endpoint"` + // RoleArn is the role policy to use when interacting with S3 + RoleArn string `mapstructure:"role_arn"` + // S3ForcePathStyle sets the value for force path style. + S3ForcePathStyle bool `mapstructure:"s3_force_path_style"` + // DisableSLL forces communication to happen via HTTP instead of HTTPS. + DisableSSL bool `mapstructure:"disable_ssl"` + // Compression sets the algorithm used to process the payload + // before uploading to S3. + // Valid values are: `gzip` or no value set. + Compression configcompression.Type `mapstructure:"compression"` } type MarshalerType string diff --git a/exporter/awss3exporter/exporter.go b/exporter/awss3exporter/exporter.go index b3e84aaa7af8..016d91fc49af 100644 --- a/exporter/awss3exporter/exporter.go +++ b/exporter/awss3exporter/exporter.go @@ -7,6 +7,7 @@ import ( "context" "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter/internal/upload" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -18,23 +19,26 @@ import ( type s3Exporter struct { config *Config - dataWriter dataWriter + signalType string + uploader upload.Manager logger *zap.Logger marshaler marshaler } -func newS3Exporter(config *Config, +func newS3Exporter( + config *Config, + signalType string, params exporter.Settings, ) *s3Exporter { s3Exporter := &s3Exporter{ config: config, - dataWriter: &s3Writer{}, + signalType: signalType, logger: params.Logger, } return s3Exporter } -func (e *s3Exporter) start(_ context.Context, host component.Host) error { +func (e *s3Exporter) start(ctx context.Context, host component.Host) error { var m marshaler var err error if e.config.Encoding != nil { @@ -48,6 +52,12 @@ func (e *s3Exporter) start(_ context.Context, host component.Host) error { } e.marshaler = m + + up, err := newUploadManager(ctx, e.config, e.signalType, m.format()) + if err != nil { + return err + } + e.uploader = up return nil } @@ -61,7 +71,7 @@ func (e *s3Exporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) err return err } - return e.dataWriter.writeBuffer(ctx, buf, e.config, "metrics", e.marshaler.format()) + return e.uploader.Upload(ctx, buf) } func (e *s3Exporter) ConsumeLogs(ctx context.Context, logs plog.Logs) error { @@ -70,7 +80,7 @@ func (e *s3Exporter) ConsumeLogs(ctx context.Context, logs plog.Logs) error { return err } - return e.dataWriter.writeBuffer(ctx, buf, e.config, "logs", e.marshaler.format()) + return e.uploader.Upload(ctx, buf) } func (e *s3Exporter) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { @@ -79,5 +89,5 @@ func (e *s3Exporter) ConsumeTraces(ctx context.Context, traces ptrace.Traces) er return err } - return e.dataWriter.writeBuffer(ctx, buf, e.config, "traces", e.marshaler.format()) + return e.uploader.Upload(ctx, buf) } diff --git a/exporter/awss3exporter/exporter_test.go b/exporter/awss3exporter/exporter_test.go index 8c06efb5acf4..814f9a280b2f 100644 --- a/exporter/awss3exporter/exporter_test.go +++ b/exporter/awss3exporter/exporter_test.go @@ -18,7 +18,7 @@ type TestWriter struct { t *testing.T } -func (testWriter *TestWriter) writeBuffer(_ context.Context, buf []byte, _ *Config, _ string, _ string) error { +func (testWriter *TestWriter) Upload(_ context.Context, buf []byte) error { assert.Equal(testWriter.t, testLogs, buf) return nil } @@ -33,10 +33,10 @@ func getTestLogs(tb testing.TB) plog.Logs { func getLogExporter(t *testing.T) *s3Exporter { marshaler, _ := newMarshaler("otlp_json", zap.NewNop()) exporter := &s3Exporter{ - config: createDefaultConfig().(*Config), - dataWriter: &TestWriter{t}, - logger: zap.NewNop(), - marshaler: marshaler, + config: createDefaultConfig().(*Config), + uploader: &TestWriter{t}, + logger: zap.NewNop(), + marshaler: marshaler, } return exporter } diff --git a/exporter/awss3exporter/factory.go b/exporter/awss3exporter/factory.go index 75964280fca7..da343d63ba0a 100644 --- a/exporter/awss3exporter/factory.go +++ b/exporter/awss3exporter/factory.go @@ -39,7 +39,7 @@ func createLogsExporter(ctx context.Context, params exporter.Settings, config component.Config, ) (exporter.Logs, error) { - s3Exporter := newS3Exporter(config.(*Config), params) + s3Exporter := newS3Exporter(config.(*Config), "logs", params) return exporterhelper.NewLogs(ctx, params, config, @@ -51,7 +51,7 @@ func createMetricsExporter(ctx context.Context, params exporter.Settings, config component.Config, ) (exporter.Metrics, error) { - s3Exporter := newS3Exporter(config.(*Config), params) + s3Exporter := newS3Exporter(config.(*Config), "metrics", params) if config.(*Config).MarshalerName == SumoIC { return nil, fmt.Errorf("metrics are not supported by sumo_ic output format") @@ -67,7 +67,7 @@ func createTracesExporter(ctx context.Context, params exporter.Settings, config component.Config, ) (exporter.Traces, error) { - s3Exporter := newS3Exporter(config.(*Config), params) + s3Exporter := newS3Exporter(config.(*Config), "traces", params) if config.(*Config).MarshalerName == SumoIC { return nil, fmt.Errorf("traces are not supported by sumo_ic output format") diff --git a/exporter/awss3exporter/go.mod b/exporter/awss3exporter/go.mod index d1b7403b5268..df6e004b42eb 100644 --- a/exporter/awss3exporter/go.mod +++ b/exporter/awss3exporter/go.mod @@ -3,8 +3,14 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3e go 1.22.0 require ( - github.com/aws/aws-sdk-go v1.55.5 + github.com/aws/aws-sdk-go-v2 v1.32.6 + github.com/aws/aws-sdk-go-v2/config v1.28.6 + github.com/aws/aws-sdk-go-v2/credentials v1.17.47 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.43 + github.com/aws/aws-sdk-go-v2/service/s3 v1.71.0 + github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 github.com/stretchr/testify v1.10.0 + github.com/tilinna/clock v1.1.0 go.opentelemetry.io/collector/component v0.115.0 go.opentelemetry.io/collector/component/componenttest v0.115.0 go.opentelemetry.io/collector/config/configcompression v1.21.0 @@ -20,6 +26,19 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.6 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.6 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 // indirect + github.com/aws/smithy-go v1.22.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -34,7 +53,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect diff --git a/exporter/awss3exporter/go.sum b/exporter/awss3exporter/go.sum index 90c6158d2d13..25efe375f496 100644 --- a/exporter/awss3exporter/go.sum +++ b/exporter/awss3exporter/go.sum @@ -1,5 +1,41 @@ -github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= -github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go-v2 v1.32.6 h1:7BokKRgRPuGmKkFMhEg/jSul+tB9VvXhcViILtfG8b4= +github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc= +github.com/aws/aws-sdk-go-v2/config v1.28.6 h1:D89IKtGrs/I3QXOLNTH93NJYtDhm8SYa9Q5CsPShmyo= +github.com/aws/aws-sdk-go-v2/config v1.28.6/go.mod h1:GDzxJ5wyyFSCoLkS+UhGB0dArhb9mI+Co4dHtoTxbko= +github.com/aws/aws-sdk-go-v2/credentials v1.17.47 h1:48bA+3/fCdi2yAwVt+3COvmatZ6jUDNkDTIsqDiMUdw= +github.com/aws/aws-sdk-go-v2/credentials v1.17.47/go.mod h1:+KdckOejLW3Ks3b0E3b5rHsr2f9yuORBum0WPnE5o5w= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 h1:AmoU1pziydclFT/xRV+xXE/Vb8fttJCLRPv8oAkprc0= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21/go.mod h1:AjUdLYe4Tgs6kpH4Bv7uMZo7pottoyHMn4eTcIcneaY= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.43 h1:iLdpkYZ4cXIQMO7ud+cqMWR1xK5ESbt1rvN77tRi1BY= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.43/go.mod h1:OgbsKPAswXDd5kxnR4vZov69p3oYjbvUyIRBAAV0y9o= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 h1:s/fF4+yDQDoElYhfIVvSNyeCydfbuTKzhxSXDXCPasU= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25/go.mod h1:IgPfDv5jqFIzQSNbUEMoitNooSMXjRSDkhXv8jiROvU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 h1:ZntTCl5EsYnhN/IygQEUugpdwbhdkom9uHcbCftiGgA= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25/go.mod h1:DBdPrgeocww+CSl1C8cEV8PN1mHMBhuCDLpXezyvWkE= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25 h1:r67ps7oHCYnflpgDy2LZU0MAQtQbYIOqNNnqGO6xQkE= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25/go.mod h1:GrGY+Q4fIokYLtjCVB/aFfCVL6hhGUFl8inD18fDalE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.6 h1:HCpPsWqmYQieU7SS6E9HXfdAMSud0pteVXieJmcpIRI= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.6/go.mod h1:ngUiVRCco++u+soRRVBIvBZxSMMvOVMXA4PJ36JLfSw= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 h1:50+XsN70RS7dwJ2CkVNXzj7U2L1HKP8nqTd3XWEXBN4= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6/go.mod h1:WqgLmwY7so32kG01zD8CPTJWVWM+TzJoOVHwTg4aPug= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.6 h1:BbGDtTi0T1DYlmjBiCr/le3wzhA37O8QTC5/Ab8+EXk= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.6/go.mod h1:hLMJt7Q8ePgViKupeymbqI0la+t9/iYFBjxQCFwuAwI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.71.0 h1:nyuzXooUNJexRT0Oy0UQY6AhOzxPxhtt4DcBIHyCnmw= +github.com/aws/aws-sdk-go-v2/service/s3 v1.71.0/go.mod h1:sT/iQz8JK3u/5gZkT+Hmr7GzVZehUMkRZpOaAwYXeGY= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 h1:rLnYAfXQ3YAccocshIH5mzNNwZBkBo+bP6EhIxak6Hw= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.7/go.mod h1:ZHtuQJ6t9A/+YDuxOLnbryAmITtr8UysSny3qcyvJTc= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 h1:JnhTZR3PiYDNKlXy50/pNeix9aGMo6lLpXwJ1mw8MD4= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6/go.mod h1:URronUEGfXZN1VpdktPSD1EkAL9mfrV+2F4sjH38qOY= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 h1:s4074ZO1Hk8qv65GqNXqDjmkf4HSQqJukaLuuW0TpDA= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.2/go.mod h1:mVggCnIWoM09jP71Wh+ea7+5gAp53q+49wDFs1SW5z8= +github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= +github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= @@ -41,10 +77,6 @@ github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKe github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= -github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= -github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= -github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -105,6 +137,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tilinna/clock v1.1.0 h1:6IQQQCo6KoBxVudv6gwtY8o4eDfhHo8ojA5dP0MfhSs= +github.com/tilinna/clock v1.1.0/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= @@ -317,8 +351,5 @@ google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojt gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/exporter/awss3exporter/internal/upload/partition.go b/exporter/awss3exporter/internal/upload/partition.go new file mode 100644 index 000000000000..0d1de8aa5d83 --- /dev/null +++ b/exporter/awss3exporter/internal/upload/partition.go @@ -0,0 +1,95 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package upload // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter/internal/upload" + +import ( + "fmt" + "math/rand/v2" + "strconv" + "time" + + "go.opentelemetry.io/collector/config/configcompression" +) + +var ( + compressionFileExtensions = map[configcompression.Type]string{ + configcompression.TypeGzip: ".gz", + } +) + +type PartitionKeyBuilder struct { + // PartitionPrefix defines the S3 directory (key) + // prefix used to write the file + PartitionPrefix string + // PartitionTruncation is used to truncate values into + // different time buckets. + // Currently hourly or minutely is supported + PartitionTruncation string + // FilePrefix is used to define the prefix of the file written + // to the directory in S3. + FilePrefix string + // FileFormat defines what encoding was used to write + // the content to s3 + FileFormat string + // Metadata provides additional details regarding the file + // Expected to be one of "metrics", "traces", or "logs" + Metadata string + // Compression defines algorithm used on the + // body before uploaded. + Compression configcompression.Type + // UniqueKeyFunc allows for overwritting the default behaviour of + // generating a new unique string to avoid collosions on file upload + // across many different instances. + // + // TODO: Expose the ability to config additional UniqueKeyField via config + UniqueKeyFunc func() string +} + +func (pki *PartitionKeyBuilder) Build(ts time.Time) string { + return pki.bucketKeyPrefix(ts) + "/" + pki.fileName() +} + +func (pki *PartitionKeyBuilder) bucketKeyPrefix(ts time.Time) string { + key := fmt.Sprintf("year=%d/month=%02d/day=%02d/hour=%02d", ts.Year(), ts.Month(), ts.Day(), ts.Hour()) + + switch pki.PartitionTruncation { + case "minute": + key += "/" + fmt.Sprintf("minute=%02d", ts.Minute()) + default: + // Nothing to do, key defaults to hourly + } + + return pki.PartitionPrefix + "/" + key +} + +func (pki *PartitionKeyBuilder) fileName() string { + var ( + suffix string + ) + + if pki.FileFormat != "" { + suffix = "." + pki.FileFormat + } + + if ext, ok := compressionFileExtensions[pki.Compression]; ok { + suffix += ext + } + + return pki.FilePrefix + pki.Metadata + "_" + pki.uniqueKey() + suffix +} + +func (pki *PartitionKeyBuilder) uniqueKey() string { + if pki.UniqueKeyFunc != nil { + return pki.UniqueKeyFunc() + } + + // This follows the original "uniqueness" algorithm + // to avoid collosions on file uploads across different nodes. + const ( + uniqueValues = 999999999 + minOffset = 100000000 + ) + + return strconv.Itoa(minOffset + rand.IntN(uniqueValues-minOffset)) +} diff --git a/exporter/awss3exporter/internal/upload/partition_test.go b/exporter/awss3exporter/internal/upload/partition_test.go new file mode 100644 index 000000000000..80eb7a80d82f --- /dev/null +++ b/exporter/awss3exporter/internal/upload/partition_test.go @@ -0,0 +1,186 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package upload + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/config/configcompression" +) + +func TestPartitionKeyInputsNewPartitionKey(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + inputs *PartitionKeyBuilder + expect string + }{ + { + name: "empty values", + inputs: &PartitionKeyBuilder{ + UniqueKeyFunc: func() string { + return "fixed" + }, + }, + expect: "/year=2024/month=01/day=24/hour=06/_fixed", + }, + { + name: "no compression set", + inputs: &PartitionKeyBuilder{ + PartitionPrefix: "/telemetry", + PartitionTruncation: "minute", + FilePrefix: "signal-output-", + Metadata: "service-01_pod2", + FileFormat: "metrics", + UniqueKeyFunc: func() string { + return "fixed" + }, + }, + expect: "/telemetry/year=2024/month=01/day=24/hour=06/minute=40/signal-output-service-01_pod2_fixed.metrics", + }, + { + name: "gzip compression set", + inputs: &PartitionKeyBuilder{ + PartitionPrefix: "/telemetry", + PartitionTruncation: "minute", + FilePrefix: "signal-output-", + Metadata: "service-01_pod2", + FileFormat: "metrics", + Compression: configcompression.TypeGzip, + UniqueKeyFunc: func() string { + return "fixed" + }, + }, + expect: "/telemetry/year=2024/month=01/day=24/hour=06/minute=40/signal-output-service-01_pod2_fixed.metrics.gz", + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ts := time.Date(2024, 01, 24, 6, 40, 20, 0, time.Local) + + assert.Equal(t, tc.expect, tc.inputs.Build(ts), "Must match the expected value") + }) + } +} + +func TestPartitionKeyInputsBucketPrefix(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + inputs *PartitionKeyBuilder + expect string + }{ + { + name: "no values provided", + inputs: &PartitionKeyBuilder{}, + expect: "/year=2024/month=01/day=24/hour=06", + }, + { + name: "parition by minutes", + inputs: &PartitionKeyBuilder{ + PartitionTruncation: "minute", + }, + expect: "/year=2024/month=01/day=24/hour=06/minute=40", + }, + { + name: "unknown partition trunction value", + inputs: &PartitionKeyBuilder{ + PartitionTruncation: "weekly", + }, + expect: "/year=2024/month=01/day=24/hour=06", + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ts := time.Date(2024, 01, 24, 6, 40, 20, 0, time.Local) + + assert.Equal(t, tc.expect, tc.inputs.bucketKeyPrefix(ts), "Must match the expected partition key") + }) + } +} + +func TestPartitionKeyInputsFilename(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + inputs *PartitionKeyBuilder + expect string + }{ + { + name: "no values provided", + inputs: &PartitionKeyBuilder{ + UniqueKeyFunc: func() string { + return "buzz" + }, + }, + expect: "_buzz", + }, + { + name: "no compression provided", + inputs: &PartitionKeyBuilder{ + FilePrefix: "collector-capture-", + FileFormat: "metrics", + Metadata: "service-01_pod1", + UniqueKeyFunc: func() string { + return "buzz" + }, + }, + expect: "collector-capture-service-01_pod1_buzz.metrics", + }, + { + name: "valid compression set", + inputs: &PartitionKeyBuilder{ + FilePrefix: "collector-capture-", + FileFormat: "metrics", + Metadata: "service-01_pod1", + Compression: configcompression.TypeGzip, + UniqueKeyFunc: func() string { + return "buzz" + }, + }, + expect: "collector-capture-service-01_pod1_buzz.metrics.gz", + }, + { + name: "invalid compression set", + inputs: &PartitionKeyBuilder{ + FilePrefix: "collector-capture-", + FileFormat: "metrics", + Metadata: "service-01_pod1", + Compression: configcompression.Type("foo"), + UniqueKeyFunc: func() string { + return "buzz" + }, + }, + expect: "collector-capture-service-01_pod1_buzz.metrics", + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + assert.Equal(t, tc.expect, tc.inputs.fileName(), "Must match the expected value") + }) + } +} + +func TestPartitionKeyInputsUniqueKey(t *testing.T) { + t.Parallel() + + // This test to is to help validate that a unique key + // is not repeated + + seen := make(map[string]struct{}) + for i := 0; i < 500; i++ { + uv := (&PartitionKeyBuilder{}).uniqueKey() + _, ok := seen[uv] + assert.False(t, ok, "Must not have repeated parition key %q", uv) + seen[uv] = struct{}{} + } +} diff --git a/exporter/awss3exporter/internal/upload/writer.go b/exporter/awss3exporter/internal/upload/writer.go new file mode 100644 index 000000000000..bbe6a10ff0b8 --- /dev/null +++ b/exporter/awss3exporter/internal/upload/writer.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package upload // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter/internal/upload" + +import ( + "bytes" + "compress/gzip" + "context" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/tilinna/clock" + "go.opentelemetry.io/collector/config/configcompression" +) + +type Manager interface { + Upload(ctx context.Context, data []byte) error +} + +type s3manager struct { + bucket string + builder *PartitionKeyBuilder + uploader *manager.Uploader +} + +var _ Manager = (*s3manager)(nil) + +func NewS3Manager(bucket string, builder *PartitionKeyBuilder, service *s3.Client) Manager { + return &s3manager{ + bucket: bucket, + builder: builder, + uploader: manager.NewUploader(service), + } +} + +func (sw *s3manager) Upload(ctx context.Context, data []byte) error { + if len(data) == 0 { + return nil + } + + content, err := sw.contentBuffer(data) + if err != nil { + return err + } + + encoding := "" + if sw.builder.Compression.IsCompressed() { + encoding = string(sw.builder.Compression) + } + + now := clock.Now(ctx) + + _, err = sw.uploader.Upload(ctx, &s3.PutObjectInput{ + Bucket: aws.String(sw.bucket), + Key: aws.String(sw.builder.Build(now)), + Body: content, + ContentEncoding: aws.String(encoding), + }) + + return err +} + +func (sw *s3manager) contentBuffer(raw []byte) (*bytes.Buffer, error) { + switch sw.builder.Compression { + case configcompression.TypeGzip: + content := bytes.NewBuffer(nil) + + zipper := gzip.NewWriter(content) + if _, err := zipper.Write(raw); err != nil { + return nil, err + } + if err := zipper.Close(); err != nil { + return nil, err + } + + return content, nil + } + return bytes.NewBuffer(raw), nil +} diff --git a/exporter/awss3exporter/internal/upload/writer_test.go b/exporter/awss3exporter/internal/upload/writer_test.go new file mode 100644 index 000000000000..fc27c50302ad --- /dev/null +++ b/exporter/awss3exporter/internal/upload/writer_test.go @@ -0,0 +1,158 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package upload + +import ( + "compress/gzip" + "context" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tilinna/clock" + "go.opentelemetry.io/collector/config/configcompression" +) + +func TestNewS3Manager(t *testing.T) { + t.Parallel() + + sm := NewS3Manager( + "my-bucket", + &PartitionKeyBuilder{}, + s3.New(s3.Options{}), + ) + + assert.NotNil(t, sm, "Must have a valid client returned") +} + +func TestS3ManagerUpload(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + handler func(t *testing.T) http.Handler + compression configcompression.Type + data []byte + errVal string + }{ + { + name: "successful upload", + handler: func(t *testing.T) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.Copy(io.Discard, r.Body) + _ = r.Body.Close() + + assert.Equal( + t, + "/my-bucket/telemetry/year=2024/month=01/day=10/hour=10/minute=30/signal-data-noop_random.metrics", + r.URL.Path, + "Must match the expected path", + ) + }) + }, + compression: configcompression.Type(""), + data: []byte("hello world"), + errVal: "", + }, + { + name: "successful compression upload", + handler: func(t *testing.T) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal( + t, + "/my-bucket/telemetry/year=2024/month=01/day=10/hour=10/minute=30/signal-data-noop_random.metrics.gz", + r.URL.Path, + "Must match the expected path", + ) + + gr, err := gzip.NewReader(r.Body) + require.NoError(t, err, "Must not error creating gzip reader") + + data, err := io.ReadAll(gr) + assert.Equal(t, []byte("hello world"), data, "Must match the expected data") + require.NoError(t, err, "Must not error reading data from reader") + + _ = gr.Close() + _ = r.Body.Close() + }) + }, + compression: configcompression.TypeGzip, + data: []byte("hello world"), + errVal: "", + }, + { + name: "no data upload", + handler: func(t *testing.T) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.Copy(io.Discard, r.Body) + _ = r.Body.Close() + + assert.Fail(t, "Must not call handler when no data is provided") + w.WriteHeader(http.StatusBadRequest) + }) + }, + data: nil, + errVal: "", + }, + { + name: "failed upload", + handler: func(t *testing.T) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.Copy(io.Discard, r.Body) + _ = r.Body.Close() + + http.Error(w, "Invalid ARN provided", http.StatusUnauthorized) + }) + }, + data: []byte("good payload"), + errVal: "operation error S3: PutObject, https response error StatusCode: 401, RequestID: , HostID: , api error Unauthorized: Unauthorized", + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + s := httptest.NewServer(tc.handler(t)) + t.Cleanup(s.Close) + + sm := NewS3Manager( + "my-bucket", + &PartitionKeyBuilder{ + PartitionPrefix: "telemetry", + PartitionTruncation: "minute", + FilePrefix: "signal-data-", + Metadata: "noop", + FileFormat: "metrics", + Compression: tc.compression, + UniqueKeyFunc: func() string { + return "random" + }, + }, + s3.New(s3.Options{ + BaseEndpoint: aws.String(s.URL), + Region: "local", + }), + ) + + // Using a mocked virtual clock to fix the timestamp used + // to reduce the potential of flakey tests + mc := clock.NewMock(time.Date(2024, 01, 10, 10, 30, 40, 100, time.Local)) + + err := sm.Upload( + clock.Context(context.Background(), mc), + tc.data, + ) + if tc.errVal != "" { + assert.EqualError(t, err, tc.errVal, "Must match the expected error") + } else { + assert.NoError(t, err, "Must not have return an error") + } + }) + } +} diff --git a/exporter/awss3exporter/s3_writer.go b/exporter/awss3exporter/s3_writer.go index 13667f38a074..e85e755f3ff9 100644 --- a/exporter/awss3exporter/s3_writer.go +++ b/exporter/awss3exporter/s3_writer.go @@ -4,129 +4,71 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter" import ( - "bytes" - "compress/gzip" "context" - "fmt" - "math/rand" - "strconv" - "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials/stscreds" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3/s3manager" - "go.opentelemetry.io/collector/config/configcompression" -) - -type s3Writer struct{} + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials/stscreds" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sts" -// generate the s3 time key based on partition configuration -func getTimeKey(time time.Time, partition string) string { - var timeKey string - year, month, day := time.Date() - hour, minute, _ := time.Clock() - - if partition == "hour" { - timeKey = fmt.Sprintf("year=%d/month=%02d/day=%02d/hour=%02d", year, month, day, hour) - } else { - timeKey = fmt.Sprintf("year=%d/month=%02d/day=%02d/hour=%02d/minute=%02d", year, month, day, hour, minute) - } - return timeKey -} - -func randomInRange(low, hi int) int { - return low + rand.Intn(hi-low) -} - -func getS3Key(time time.Time, keyPrefix string, partition string, filePrefix string, metadata string, fileFormat string, compression configcompression.Type) string { - timeKey := getTimeKey(time, partition) - randomID := randomInRange(100000000, 999999999) - suffix := "" - if fileFormat != "" { - suffix = "." + fileFormat - } - - s3Key := keyPrefix + "/" + timeKey + "/" + filePrefix + metadata + "_" + strconv.Itoa(randomID) + suffix - - // add ".gz" extension to files if compression is enabled - if compression == configcompression.TypeGzip { - s3Key += ".gz" - } + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter/internal/upload" +) - return s3Key -} +func newUploadManager( + ctx context.Context, + conf *Config, + metadata string, + format string, +) (upload.Manager, error) { + configOpts := []func(*config.LoadOptions) error{} -func getSessionConfig(config *Config) *aws.Config { - sessionConfig := &aws.Config{ - Region: aws.String(config.S3Uploader.Region), - S3ForcePathStyle: &config.S3Uploader.S3ForcePathStyle, - DisableSSL: &config.S3Uploader.DisableSSL, + if region := conf.S3Uploader.Region; region != "" { + configOpts = append(configOpts, config.WithRegion(region)) } - endpoint := config.S3Uploader.Endpoint - if endpoint != "" { - sessionConfig.Endpoint = aws.String(endpoint) + cfg, err := config.LoadDefaultConfig(ctx, configOpts...) + if err != nil { + return nil, err } - return sessionConfig -} - -func getSession(config *Config, sessionConfig *aws.Config) (*session.Session, error) { - sess, err := session.NewSession(sessionConfig) - - if config.S3Uploader.RoleArn != "" { - credentials := stscreds.NewCredentials(sess, config.S3Uploader.RoleArn) - sess.Config.Credentials = credentials + s3Opts := []func(*s3.Options){ + func(o *s3.Options) { + o.EndpointOptions = s3.EndpointResolverOptions{ + DisableHTTPS: conf.S3Uploader.DisableSSL, + } + o.UsePathStyle = conf.S3Uploader.S3ForcePathStyle + }, } - return sess, err -} - -func (s3writer *s3Writer) writeBuffer(_ context.Context, buf []byte, config *Config, metadata string, format string) error { - now := time.Now() - key := getS3Key(now, - config.S3Uploader.S3Prefix, config.S3Uploader.S3Partition, - config.S3Uploader.FilePrefix, metadata, format, config.S3Uploader.Compression) - - encoding := "" - var reader *bytes.Reader - if config.S3Uploader.Compression == configcompression.TypeGzip { - // set s3 uploader content encoding to "gzip" - encoding = "gzip" - var gzipContents bytes.Buffer - - // create a gzip from data - gzipWriter := gzip.NewWriter(&gzipContents) - _, err := gzipWriter.Write(buf) - if err != nil { - return err - } - gzipWriter.Close() - - reader = bytes.NewReader(gzipContents.Bytes()) - } else { - // create a reader from data in memory - reader = bytes.NewReader(buf) + if conf.S3Uploader.Endpoint != "" { + s3Opts = append(s3Opts, func(o *s3.Options) { + o.BaseEndpoint = aws.String((conf.S3Uploader.Endpoint)) + }) } - sessionConfig := getSessionConfig(config) - sess, err := getSession(config, sessionConfig) - if err != nil { - return err + if arn := conf.S3Uploader.RoleArn; arn != "" { + s3Opts = append(s3Opts, func(o *s3.Options) { + o.Credentials = stscreds.NewAssumeRoleProvider(sts.NewFromConfig(cfg), arn) + }) } - uploader := s3manager.NewUploader(sess) - - _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(config.S3Uploader.S3Bucket), - Key: aws.String(key), - Body: reader, - ContentEncoding: &encoding, - }) - if err != nil { - return err + if endpoint := conf.S3Uploader.Endpoint; endpoint != "" { + s3Opts = append(s3Opts, func(o *s3.Options) { + o.BaseEndpoint = aws.String(endpoint) + }) } - return nil + return upload.NewS3Manager( + conf.S3Uploader.S3Bucket, + &upload.PartitionKeyBuilder{ + PartitionPrefix: conf.S3Uploader.S3Prefix, + PartitionTruncation: conf.S3Uploader.S3Partition, + FilePrefix: conf.S3Uploader.FilePrefix, + Metadata: metadata, + FileFormat: format, + Compression: conf.S3Uploader.Compression, + }, + s3.NewFromConfig(cfg, s3Opts...), + ), nil } diff --git a/exporter/awss3exporter/s3_writer_test.go b/exporter/awss3exporter/s3_writer_test.go index cdd5e1f025e3..945790562097 100644 --- a/exporter/awss3exporter/s3_writer_test.go +++ b/exporter/awss3exporter/s3_writer_test.go @@ -4,145 +4,57 @@ package awss3exporter import ( - "regexp" + "context" "testing" - "time" - "github.com/aws/aws-sdk-go/aws" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configcompression" ) -func TestS3TimeKey(t *testing.T) { - const layout = "2006-01-02" - - tm, err := time.Parse(layout, "2022-06-05") - timeKey := getTimeKey(tm, "hour") - - assert.NoError(t, err) - require.NotNil(t, tm) - assert.Equal(t, "year=2022/month=06/day=05/hour=00", timeKey) - - timeKey = getTimeKey(tm, "minute") - assert.Equal(t, "year=2022/month=06/day=05/hour=00/minute=00", timeKey) -} - -func TestS3Key(t *testing.T) { - const layout = "2006-01-02" - - tm, err := time.Parse(layout, "2022-06-05") - - assert.NoError(t, err) - require.NotNil(t, tm) - - re := regexp.MustCompile(`keyprefix/year=2022/month=06/day=05/hour=00/minute=00/fileprefixlogs_([0-9]+).json`) - s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json", "") - matched := re.MatchString(s3Key) - assert.True(t, matched) -} - -func TestS3KeyEmptyFileFormat(t *testing.T) { - const layout = "2006-01-02" - - tm, err := time.Parse(layout, "2022-06-05") - - assert.NoError(t, err) - require.NotNil(t, tm) - - re := regexp.MustCompile(`keyprefix/year=2022/month=06/day=05/hour=00/minute=00/fileprefixlogs_([0-9]+)`) - s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "", "") - matched := re.MatchString(s3Key) - assert.True(t, matched) -} - -func TestS3KeyOfCompressedFile(t *testing.T) { - const layout = "2006-01-02" - - tm, err := time.Parse(layout, "2022-06-05") - - assert.NoError(t, err) - require.NotNil(t, tm) - - re := regexp.MustCompile(`keyprefix/year=2022/month=06/day=05/hour=00/minute=00/fileprefixlogs_([0-9]+).json.gz`) - s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json", "gzip") - matched := re.MatchString(s3Key) - assert.True(t, matched) -} - -func TestS3KeyOfCompressedFileEmptyFileFormat(t *testing.T) { - const layout = "2006-01-02" - - tm, err := time.Parse(layout, "2022-06-05") - - assert.NoError(t, err) - require.NotNil(t, tm) - - re := regexp.MustCompile(`keyprefix/year=2022/month=06/day=05/hour=00/minute=00/fileprefixlogs_([0-9]+).gz`) - s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "", "gzip") - matched := re.MatchString(s3Key) - assert.True(t, matched) -} - -func TestGetSessionConfigWithEndpoint(t *testing.T) { - const endpoint = "https://endpoint.com" - const region = "region" - config := &Config{ - S3Uploader: S3UploaderConfig{ - Region: region, - Endpoint: endpoint, - }, - } - sessionConfig := getSessionConfig(config) - assert.Equal(t, sessionConfig.Endpoint, aws.String(endpoint)) - assert.Equal(t, sessionConfig.Region, aws.String(region)) -} - -func TestGetSessionConfigNoEndpoint(t *testing.T) { - const region = "region" - config := &Config{ - S3Uploader: S3UploaderConfig{ - Region: region, - }, - } - sessionConfig := getSessionConfig(config) - assert.Empty(t, sessionConfig.Endpoint) - assert.Equal(t, sessionConfig.Region, aws.String(region)) -} - -func TestGetSessionConfigWithRoleArn(t *testing.T) { - const region = "region" - const roleArn = "arn:aws:iam::12345:role/s3-exportation-role" - config := &Config{ - S3Uploader: S3UploaderConfig{ - Region: region, - RoleArn: roleArn, +func TestNewUploadManager(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + conf *Config + errVal string + }{ + { + name: "valid configuration", + conf: &Config{ + S3Uploader: S3UploaderConfig{ + Region: "local", + S3Bucket: "my-awesome-bucket", + S3Prefix: "opentelemetry", + S3Partition: "hour", + FilePrefix: "ingested-data-", + Endpoint: "localhost", + RoleArn: "arn:aws:iam::123456789012:my-awesome-user", + S3ForcePathStyle: true, + DisableSSL: true, + Compression: configcompression.TypeGzip, + }, + }, + errVal: "", }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + sm, err := newUploadManager( + context.Background(), + tc.conf, + "metrics", + "otlp", + ) + + if tc.errVal != "" { + assert.Nil(t, sm, "Must not have a valid s3 upload manager") + assert.EqualError(t, err, tc.errVal, "Must match the expected error") + } else { + assert.NotNil(t, sm, "Must have a valid manager") + assert.NoError(t, err, "Must not error when creating client") + } + }) } - - sessionConfig := getSessionConfig(config) - sess, err := getSession(config, sessionConfig) - - creds, _ := sess.Config.Credentials.Get() - - assert.NoError(t, err) - assert.Equal(t, sessionConfig.Region, aws.String(region)) - assert.Equal(t, "AssumeRoleProvider", creds.ProviderName) -} - -func TestGetSessionConfigWithoutRoleArn(t *testing.T) { - const region = "region" - config := &Config{ - S3Uploader: S3UploaderConfig{ - Region: region, - }, - } - - sessionConfig := getSessionConfig(config) - sess, err := getSession(config, sessionConfig) - - creds, _ := sess.Config.Credentials.Get() - - assert.NoError(t, err) - assert.Equal(t, sessionConfig.Region, aws.String(region)) - assert.NotEqual(t, "AssumeRoleProvider", creds.ProviderName) }