Skip to content
This repository has been archived by the owner on Nov 28, 2022. It is now read-only.

Commit

Permalink
Backport upgrade event tracing (knative#6342) (#1710)
Browse files Browse the repository at this point in the history
* Wathola Tracing for upgrade tests (knative#6219)

* wathola exposing trace information

* Run update-deps.sh

* Fix license

* Fix import

* Ensure backwards compatibility

* Assert ParentID not nil in test

* Separate old and new events sender APIs

* Make loggingCfg in client private

* Wait only 1 second for flushing tracing info

The Reporter is created with a default batch interval 1 second. So, it
should be enough to wait just 1 second because the data is flushed every
1 second.

* Increase the sleep time to 1.5 seconds to be safe

* The ticker runs every 100ms so it could be 1100 ms until the buffer
really flushes.

* Use Log.Fatal when tracing is not set up properly

* Increase the sleep time to 5 seconds and reference knative/pkg issue

* Process empty tracing config in test images (knative#6289)

* Print traces for missed events in upgrade tests (knative#6249)

* Upgrade tests reporting Trace information for missed events

* TMP: Induce missed event

* Revert "TMP: Induce missed event"

This reverts commit 2fec7c7.

* Report trace also for Duplicated events

* TMP: Induce missed event

* TMP: Simulate duplicate events

* Fix readme

* Unify path for duplicate and missed events

* Revert "TMP: Simulate duplicate events"

This reverts commit c126521.

* Revert "TMP: Induce missed event"

This reverts commit fcd9185.

* Do not fail upgrade tests if tracing is not configured (knative#6299)

* Do not fail upgrade tests if tracing is not configured

* TMP: Do not deploy Knative Monitoring

* Revert "TMP: Do not deploy Knative Monitoring"

This reverts commit 086a8f9.

* Limit the number of exported traces (knative#6329)

Exporting traces for a large number of events can exceed the timeout of
the whole test suite, leading to all upgrade tests being reported as
failed.

* Cleanup Zipkin tracing only once in upgrade test suite (knative#6331)

* NPE fix (knative#6343)

Co-authored-by: Chris Suszynski <[email protected]>

Co-authored-by: Martin Gencur <[email protected]>
Co-authored-by: Chris Suszynski <[email protected]>
  • Loading branch information
3 people authored Apr 21, 2022
1 parent 19c485e commit d83ffda
Show file tree
Hide file tree
Showing 29 changed files with 720 additions and 140 deletions.
6 changes: 5 additions & 1 deletion test/config/monitoring/monitoring.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ spec:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: JAVA_OPTS
value: '-Xms128m -Xmx5G -XX:+ExitOnOutOfMemoryError'
- name: MEM_MAX_SPANS
value: '10000000'
resources:
limits:
memory: 1000Mi
memory: 6Gi
requests:
memory: 256Mi

Expand Down
29 changes: 14 additions & 15 deletions test/lib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (

eventing "knative.dev/eventing/pkg/client/clientset/versioned"
"knative.dev/eventing/test/lib/duck"
ti "knative.dev/eventing/test/test_images"
)

// Client holds instances of interfaces for making requests to Knative.
Expand All @@ -61,8 +60,8 @@ type Client struct {

podsCreated []string

tracingEnv corev1.EnvVar
loggingEnv *corev1.EnvVar
TracingCfg string
loggingCfg string

cleanup func()
}
Expand Down Expand Up @@ -105,12 +104,12 @@ func NewClient(namespace string, t *testing.T) (*Client, error) {
client.EventListener = NewEventListener(client.Kube, client.Namespace, client.T.Logf)
client.Cleanup(client.EventListener.Stop)

client.tracingEnv, err = getTracingConfig(client.Kube)
client.TracingCfg, err = getTracingConfig(client.Kube)
if err != nil {
return nil, err
}

client.loggingEnv, err = getLoggingConfig(client.Kube)
client.loggingCfg, err = getLoggingConfig(client.Kube)
if err != nil {
t.Log("Cannot retrieve the logging config map: ", err)
}
Expand Down Expand Up @@ -161,40 +160,40 @@ func getGenericResource(tm metav1.TypeMeta) runtime.Object {
return &duckv1.KResource{}
}

func getTracingConfig(c kubernetes.Interface) (corev1.EnvVar, error) {
func getTracingConfig(c kubernetes.Interface) (string, error) {
cm, err := c.CoreV1().ConfigMaps(system.Namespace()).Get(context.Background(), configtracing.ConfigName, metav1.GetOptions{})
if err != nil {
return corev1.EnvVar{}, fmt.Errorf("error while retrieving the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
return "", fmt.Errorf("error while retrieving the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
}

config, err := configtracing.NewTracingConfigFromConfigMap(cm)
if err != nil {
return corev1.EnvVar{}, fmt.Errorf("error while parsing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
return "", fmt.Errorf("error while parsing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
}

configSerialized, err := configtracing.TracingConfigToJSON(config)
if err != nil {
return corev1.EnvVar{}, fmt.Errorf("error while serializing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
return "", fmt.Errorf("error while serializing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
}

return corev1.EnvVar{Name: ti.ConfigTracingEnv, Value: configSerialized}, nil
return configSerialized, nil
}

func getLoggingConfig(c kubernetes.Interface) (*corev1.EnvVar, error) {
func getLoggingConfig(c kubernetes.Interface) (string, error) {
cm, err := c.CoreV1().ConfigMaps(system.Namespace()).Get(context.Background(), logging.ConfigMapName(), metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error while retrieving the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
return "", fmt.Errorf("error while retrieving the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
}

config, err := logging.NewConfigFromMap(cm.Data)
if err != nil {
return nil, fmt.Errorf("error while parsing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
return "", fmt.Errorf("error while parsing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
}

configSerialized, err := logging.ConfigToJSON(config)
if err != nil {
return nil, fmt.Errorf("error while serializing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
return "", fmt.Errorf("error while serializing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
}

return &corev1.EnvVar{Name: ti.ConfigLoggingEnv, Value: configSerialized}, nil
return configSerialized, nil
}
7 changes: 4 additions & 3 deletions test/lib/creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"knative.dev/eventing/pkg/utils"
"knative.dev/eventing/test/lib/duck"
"knative.dev/eventing/test/lib/resources"
ti "knative.dev/eventing/test/test_images"
)

// TODO(chizhg): break this file into multiple files when it grows too large.
Expand Down Expand Up @@ -566,9 +567,9 @@ func (c *Client) CreateClusterRoleBindingOrFail(saName, crName, crbName string)

func (c *Client) applyAdditionalEnv(pod *corev1.PodSpec) {
for i := 0; i < len(pod.Containers); i++ {
pod.Containers[i].Env = append(pod.Containers[i].Env, c.tracingEnv)
if c.loggingEnv != nil {
pod.Containers[i].Env = append(pod.Containers[i].Env, *c.loggingEnv)
pod.Containers[i].Env = append(pod.Containers[i].Env, corev1.EnvVar{Name: ti.ConfigTracingEnv, Value: c.TracingCfg})
if c.loggingCfg != "" {
pod.Containers[i].Env = append(pod.Containers[i].Env, corev1.EnvVar{Name: ti.ConfigLoggingEnv, Value: c.loggingCfg})
}
}
}
Expand Down
12 changes: 3 additions & 9 deletions test/test_images/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,17 @@ const (
ConfigLoggingEnv = "K_CONFIG_LOGGING"
)

// ConfigureTracing can be used in test-images to configure tracing
// ConfigureTracing can be used in test-images to configure tracing.
func ConfigureTracing(logger *zap.SugaredLogger, serviceName string) error {
tracingEnv := os.Getenv(ConfigTracingEnv)

if tracingEnv == "" {
return tracing.SetupStaticPublishing(logger, serviceName, config.NoopConfig())
}

conf, err := config.JSONToTracingConfig(tracingEnv)
if err != nil {
return err
logger.Warn("Error while trying to read the tracing config, using NoopConfig: ", err)
}

return tracing.SetupStaticPublishing(logger, serviceName, conf)
}

// ConfigureTracing can be used in test-images to configure tracing
// ConfigureLogging can be used in test-images to configure logging.
func ConfigureLogging(ctx context.Context, name string) context.Context {
loggingEnv := os.Getenv(ConfigLoggingEnv)
conf, err := logging.JSONToConfig(loggingEnv)
Expand Down
21 changes: 21 additions & 0 deletions test/upgrade/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,24 @@ struct can be influenced, by using `EVENTING_UPGRADE_TESTS_XXXXX` environmental
variable prefix (using
[kelseyhightower/envconfig](https://github.com/kelseyhightower/envconfig#usage)
usage).

#### Inspecting Zipkin traces for undelivered events

When tracing is enabled in the `config-tracing` config map in the system namespace
the prober collects traces for undelivered events. The traces are exported as json files
under the artifacts dir. Traces for each event are stored in a separate file.
Step event traces are stored as `$ARTIFACTS/traces/missed-events/step-<step_number>.json`
The finished event traces are stored as `$ARTIFACTS/traces/missed-events/finished.json`

Traces can be viewed as follows:
- Start a Zipkin container on localhost:
```
$ docker run -d -p 9411:9411 ghcr.io/openzipkin/zipkin:2
```
- Send traces to the Zipkin endpoint:
```
$ curl -v -X POST localhost:9411/api/v2/spans \
-H 'Content-Type: application/json' \
-d @$ARTIFACTS/traces/missed-events/step-<step_number>.json
```
- View traces in Zipkin UI at `http://localhost:9411/zipkin`
3 changes: 2 additions & 1 deletion test/upgrade/prober/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# logLevel = 'DEBUG'
tracingConfig = '{{- .TracingConfig -}}'
[sender]
address = '{{- .Endpoint -}}'
interval = {{ .Config.Interval.Nanoseconds }}
[forwarder]
target = 'http://wathola-receiver.{{- .Namespace -}}.svc.cluster.local'
target = '{{- .ForwarderTarget -}}'
24 changes: 17 additions & 7 deletions test/upgrade/prober/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package prober
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"path"
Expand All @@ -27,8 +26,11 @@ import (
"time"

"github.com/kelseyhightower/envconfig"
"github.com/pkg/errors"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/upgrade/prober/sut"
"knative.dev/eventing/test/upgrade/prober/wathola/forwarder"
"knative.dev/eventing/test/upgrade/prober/wathola/receiver"
duckv1 "knative.dev/pkg/apis/duck/v1"
pkgTest "knative.dev/pkg/test"
pkgupgrade "knative.dev/pkg/test/upgrade"
Expand All @@ -49,6 +51,8 @@ const (
Error DuplicateAction = "error"

prefix = "eventing_upgrade_tests"

forwarderTargetFmt = "http://" + receiver.Name + ".%s.svc.cluster.local"
)

var (
Expand Down Expand Up @@ -141,9 +145,9 @@ func (p *prober) deployConfiguration() {
Log: p.log,
Client: p.client,
}
ref := resources.KnativeRefForService(receiverName, p.client.Namespace)
ref := resources.KnativeRefForService(receiver.Name, p.client.Namespace)
if p.config.Serving.Use {
ref = resources.KnativeRefForKservice(forwarderName, p.client.Namespace)
ref = resources.KnativeRefForKservice(forwarder.Name, p.client.Namespace)
}
dest := duckv1.Destination{Ref: ref}
s := p.config.SystemUnderTest
Expand All @@ -153,19 +157,20 @@ func (p *prober) deployConfiguration() {
tr.Teardown(sc)
}
})

p.deployConfigToml(endpoint)
}

func (p *prober) deployConfigToml(endpoint interface{}) {
name := p.config.ConfigMapName
p.log.Infof("Deploying config map: \"%s/%s\"", p.client.Namespace, name)
configData := p.compileTemplate(p.config.ConfigTemplate, endpoint)
configData := p.compileTemplate(p.config.ConfigTemplate, endpoint, p.client.TracingCfg)
p.client.CreateConfigMapOrFail(name, p.client.Namespace, map[string]string{
p.config.ConfigFilename: configData,
})
}

func (p *prober) compileTemplate(templateName string, endpoint interface{}) string {
func (p *prober) compileTemplate(templateName string, endpoint interface{}, tracingConfig string) string {
_, filename, _, _ := runtime.Caller(0)
templateFilepath := path.Join(path.Dir(filename), templateName)
templateBytes, err := ioutil.ReadFile(templateFilepath)
Expand All @@ -175,15 +180,20 @@ func (p *prober) compileTemplate(templateName string, endpoint interface{}) stri
var buff bytes.Buffer
data := struct {
*Config
// Deprecated: use ForwarderTarget
Namespace string
// Deprecated: use Endpoint
BrokerURL string
Endpoint interface{}
BrokerURL string
Endpoint interface{}
TracingConfig string
ForwarderTarget string
}{
p.config,
p.client.Namespace,
fmt.Sprintf("%v", endpoint),
endpoint,
tracingConfig,
fmt.Sprintf(forwarderTargetFmt, p.client.Namespace),
}
p.ensureNoError(tmpl.Execute(&buff, data))
return buff.String()
Expand Down
25 changes: 11 additions & 14 deletions test/upgrade/prober/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,35 @@ import (
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/duck"
"knative.dev/eventing/test/lib/resources"
)

var (
forwarderName = "wathola-forwarder"
"knative.dev/eventing/test/upgrade/prober/wathola/forwarder"
)

func (p *prober) deployForwarder() {
p.log.Infof("Deploy forwarder knative service: %v", forwarderName)
p.log.Infof("Deploy forwarder knative service: %v", forwarder.Name)
serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace)
service := p.forwarderKService(forwarderName, p.client.Namespace)
service := p.forwarderKService(forwarder.Name, p.client.Namespace)
if _, err := serving.Create(p.config.Ctx, service, metav1.CreateOptions{}); err != nil {
p.client.T.Fatal(err)
}

sc := p.servingClient()
testlib.WaitFor(fmt.Sprintf("forwarder ksvc be ready: %v", forwarderName), func() error {
return duck.WaitForKServiceReady(sc, forwarderName, p.client.Namespace)
testlib.WaitFor(fmt.Sprintf("forwarder ksvc be ready: %v", forwarder.Name), func() error {
return duck.WaitForKServiceReady(sc, forwarder.Name, p.client.Namespace)
})

if p.config.Serving.ScaleToZero {
testlib.WaitFor(fmt.Sprintf("forwarder scales to zero: %v", forwarderName), func() error {
return duck.WaitForKServiceScales(p.config.Ctx, sc, forwarderName, p.client.Namespace, func(scale int) bool {
testlib.WaitFor(fmt.Sprintf("forwarder scales to zero: %v", forwarder.Name), func() error {
return duck.WaitForKServiceScales(p.config.Ctx, sc, forwarder.Name, p.client.Namespace, func(scale int) bool {
return scale == 0
})
})
}
}

func (p *prober) removeForwarder() {
p.log.Infof("Remove forwarder knative service: %v", forwarderName)
p.log.Infof("Remove forwarder knative service: %v", forwarder.Name)
serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace)
err := serving.Delete(p.config.Ctx, forwarderName, metav1.DeleteOptions{})
err := serving.Delete(p.config.Ctx, forwarder.Name, metav1.DeleteOptions{})
p.ensureNoError(err)
}

Expand All @@ -73,8 +70,8 @@ func (p *prober) forwarderKService(name, namespace string) *unstructured.Unstruc
"template": map[string]interface{}{
"spec": map[string]interface{}{
"containers": []map[string]interface{}{{
"name": forwarderName,
"image": p.config.ImageResolver(forwarderName),
"name": forwarder.Name,
"image": p.config.ImageResolver(forwarder.Name),
"volumeMounts": []map[string]interface{}{{
"name": p.config.ConfigMapName,
"mountPath": p.config.ConfigMountPoint,
Expand Down
Loading

0 comments on commit d83ffda

Please sign in to comment.