Skip to content

Commit

Permalink
Wathola Tracing for upgrade tests (#6219)
Browse files Browse the repository at this point in the history
* wathola exposing trace information

* Run update-deps.sh

* Fix license

* Fix import

* Ensure backwards compatibility

* Assert ParentID not nil in test

* Separate old and new events sender APIs

* Make loggingCfg in client private

* Wait only 1 second for flushing tracing info

The Reporter is created with a default batch interval 1 second. So, it
should be enough to wait just 1 second because the data is flushed every
1 second.

* Increase the sleep time to 1.5 seconds to be safe

* The ticker runs every 100ms so it could be 1100 ms until the buffer
really flushes.

* Use Log.Fatal when tracing is not set up properly

* Increase the sleep time to 5 seconds and reference knative/pkg issue
  • Loading branch information
mgencur authored Mar 29, 2022
1 parent eb2bfff commit 3890b39
Show file tree
Hide file tree
Showing 21 changed files with 407 additions and 108 deletions.
29 changes: 14 additions & 15 deletions test/lib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (

eventing "knative.dev/eventing/pkg/client/clientset/versioned"
"knative.dev/eventing/test/lib/duck"
ti "knative.dev/eventing/test/test_images"
)

// Client holds instances of interfaces for making requests to Knative.
Expand All @@ -61,8 +60,8 @@ type Client struct {

podsCreated []string

tracingEnv corev1.EnvVar
loggingEnv *corev1.EnvVar
TracingCfg string
loggingCfg string

cleanup func()
}
Expand Down Expand Up @@ -105,12 +104,12 @@ func NewClient(namespace string, t *testing.T) (*Client, error) {
client.EventListener = NewEventListener(client.Kube, client.Namespace, client.T.Logf)
client.Cleanup(client.EventListener.Stop)

client.tracingEnv, err = getTracingConfig(client.Kube)
client.TracingCfg, err = getTracingConfig(client.Kube)
if err != nil {
return nil, err
}

client.loggingEnv, err = getLoggingConfig(client.Kube)
client.loggingCfg, err = getLoggingConfig(client.Kube)
if err != nil {
t.Log("Cannot retrieve the logging config map: ", err)
}
Expand Down Expand Up @@ -161,40 +160,40 @@ func getGenericResource(tm metav1.TypeMeta) runtime.Object {
return &duckv1.KResource{}
}

func getTracingConfig(c kubernetes.Interface) (corev1.EnvVar, error) {
func getTracingConfig(c kubernetes.Interface) (string, error) {
cm, err := c.CoreV1().ConfigMaps(system.Namespace()).Get(context.Background(), configtracing.ConfigName, metav1.GetOptions{})
if err != nil {
return corev1.EnvVar{}, fmt.Errorf("error while retrieving the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
return "", fmt.Errorf("error while retrieving the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
}

config, err := configtracing.NewTracingConfigFromConfigMap(cm)
if err != nil {
return corev1.EnvVar{}, fmt.Errorf("error while parsing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
return "", fmt.Errorf("error while parsing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
}

configSerialized, err := configtracing.TracingConfigToJSON(config)
if err != nil {
return corev1.EnvVar{}, fmt.Errorf("error while serializing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
return "", fmt.Errorf("error while serializing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err))
}

return corev1.EnvVar{Name: ti.ConfigTracingEnv, Value: configSerialized}, nil
return configSerialized, nil
}

func getLoggingConfig(c kubernetes.Interface) (*corev1.EnvVar, error) {
func getLoggingConfig(c kubernetes.Interface) (string, error) {
cm, err := c.CoreV1().ConfigMaps(system.Namespace()).Get(context.Background(), logging.ConfigMapName(), metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error while retrieving the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
return "", fmt.Errorf("error while retrieving the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
}

config, err := logging.NewConfigFromMap(cm.Data)
if err != nil {
return nil, fmt.Errorf("error while parsing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
return "", fmt.Errorf("error while parsing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
}

configSerialized, err := logging.ConfigToJSON(config)
if err != nil {
return nil, fmt.Errorf("error while serializing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
return "", fmt.Errorf("error while serializing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err))
}

return &corev1.EnvVar{Name: ti.ConfigLoggingEnv, Value: configSerialized}, nil
return configSerialized, nil
}
7 changes: 4 additions & 3 deletions test/lib/creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"knative.dev/eventing/pkg/utils"
"knative.dev/eventing/test/lib/duck"
"knative.dev/eventing/test/lib/resources"
ti "knative.dev/eventing/test/test_images"
)

// TODO(chizhg): break this file into multiple files when it grows too large.
Expand Down Expand Up @@ -566,9 +567,9 @@ func (c *Client) CreateClusterRoleBindingOrFail(saName, crName, crbName string)

func (c *Client) applyAdditionalEnv(pod *corev1.PodSpec) {
for i := 0; i < len(pod.Containers); i++ {
pod.Containers[i].Env = append(pod.Containers[i].Env, c.tracingEnv)
if c.loggingEnv != nil {
pod.Containers[i].Env = append(pod.Containers[i].Env, *c.loggingEnv)
pod.Containers[i].Env = append(pod.Containers[i].Env, corev1.EnvVar{Name: ti.ConfigTracingEnv, Value: c.TracingCfg})
if c.loggingCfg != "" {
pod.Containers[i].Env = append(pod.Containers[i].Env, corev1.EnvVar{Name: ti.ConfigLoggingEnv, Value: c.loggingCfg})
}
}
}
Expand Down
1 change: 1 addition & 0 deletions test/upgrade/prober/config.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# logLevel = 'DEBUG'
tracingConfig = '{{- .TracingConfig -}}'
[sender]
address = '{{- .Endpoint -}}'
interval = {{ .Config.Interval.Nanoseconds }}
Expand Down
19 changes: 12 additions & 7 deletions test/upgrade/prober/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package prober
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"path"
Expand All @@ -27,8 +26,11 @@ import (
"time"

"github.com/kelseyhightower/envconfig"
"github.com/pkg/errors"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/upgrade/prober/sut"
"knative.dev/eventing/test/upgrade/prober/wathola/forwarder"
"knative.dev/eventing/test/upgrade/prober/wathola/receiver"
duckv1 "knative.dev/pkg/apis/duck/v1"
pkgTest "knative.dev/pkg/test"
pkgupgrade "knative.dev/pkg/test/upgrade"
Expand Down Expand Up @@ -141,9 +143,9 @@ func (p *prober) deployConfiguration() {
Log: p.log,
Client: p.client,
}
ref := resources.KnativeRefForService(receiverName, p.client.Namespace)
ref := resources.KnativeRefForService(receiver.Name, p.client.Namespace)
if p.config.Serving.Use {
ref = resources.KnativeRefForKservice(forwarderName, p.client.Namespace)
ref = resources.KnativeRefForKservice(forwarder.Name, p.client.Namespace)
}
dest := duckv1.Destination{Ref: ref}
s := p.config.SystemUnderTest
Expand All @@ -153,19 +155,20 @@ func (p *prober) deployConfiguration() {
tr.Teardown(sc)
}
})

p.deployConfigToml(endpoint)
}

func (p *prober) deployConfigToml(endpoint interface{}) {
name := p.config.ConfigMapName
p.log.Infof("Deploying config map: \"%s/%s\"", p.client.Namespace, name)
configData := p.compileTemplate(p.config.ConfigTemplate, endpoint)
configData := p.compileTemplate(p.config.ConfigTemplate, endpoint, p.client.TracingCfg)
p.client.CreateConfigMapOrFail(name, p.client.Namespace, map[string]string{
p.config.ConfigFilename: configData,
})
}

func (p *prober) compileTemplate(templateName string, endpoint interface{}) string {
func (p *prober) compileTemplate(templateName string, endpoint interface{}, tracingConfig string) string {
_, filename, _, _ := runtime.Caller(0)
templateFilepath := path.Join(path.Dir(filename), templateName)
templateBytes, err := ioutil.ReadFile(templateFilepath)
Expand All @@ -177,13 +180,15 @@ func (p *prober) compileTemplate(templateName string, endpoint interface{}) stri
*Config
Namespace string
// Deprecated: use Endpoint
BrokerURL string
Endpoint interface{}
BrokerURL string
Endpoint interface{}
TracingConfig string
}{
p.config,
p.client.Namespace,
fmt.Sprintf("%v", endpoint),
endpoint,
tracingConfig,
}
p.ensureNoError(tmpl.Execute(&buff, data))
return buff.String()
Expand Down
25 changes: 11 additions & 14 deletions test/upgrade/prober/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,35 @@ import (
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/duck"
"knative.dev/eventing/test/lib/resources"
)

var (
forwarderName = "wathola-forwarder"
"knative.dev/eventing/test/upgrade/prober/wathola/forwarder"
)

func (p *prober) deployForwarder() {
p.log.Infof("Deploy forwarder knative service: %v", forwarderName)
p.log.Infof("Deploy forwarder knative service: %v", forwarder.Name)
serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace)
service := p.forwarderKService(forwarderName, p.client.Namespace)
service := p.forwarderKService(forwarder.Name, p.client.Namespace)
if _, err := serving.Create(p.config.Ctx, service, metav1.CreateOptions{}); err != nil {
p.client.T.Fatal(err)
}

sc := p.servingClient()
testlib.WaitFor(fmt.Sprintf("forwarder ksvc be ready: %v", forwarderName), func() error {
return duck.WaitForKServiceReady(sc, forwarderName, p.client.Namespace)
testlib.WaitFor(fmt.Sprintf("forwarder ksvc be ready: %v", forwarder.Name), func() error {
return duck.WaitForKServiceReady(sc, forwarder.Name, p.client.Namespace)
})

if p.config.Serving.ScaleToZero {
testlib.WaitFor(fmt.Sprintf("forwarder scales to zero: %v", forwarderName), func() error {
return duck.WaitForKServiceScales(p.config.Ctx, sc, forwarderName, p.client.Namespace, func(scale int) bool {
testlib.WaitFor(fmt.Sprintf("forwarder scales to zero: %v", forwarder.Name), func() error {
return duck.WaitForKServiceScales(p.config.Ctx, sc, forwarder.Name, p.client.Namespace, func(scale int) bool {
return scale == 0
})
})
}
}

func (p *prober) removeForwarder() {
p.log.Infof("Remove forwarder knative service: %v", forwarderName)
p.log.Infof("Remove forwarder knative service: %v", forwarder.Name)
serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace)
err := serving.Delete(p.config.Ctx, forwarderName, metav1.DeleteOptions{})
err := serving.Delete(p.config.Ctx, forwarder.Name, metav1.DeleteOptions{})
p.ensureNoError(err)
}

Expand All @@ -73,8 +70,8 @@ func (p *prober) forwarderKService(name, namespace string) *unstructured.Unstruc
"template": map[string]interface{}{
"spec": map[string]interface{}{
"containers": []map[string]interface{}{{
"name": forwarderName,
"image": p.config.ImageResolver(forwarderName),
"name": forwarder.Name,
"image": p.config.ImageResolver(forwarder.Name),
"volumeMounts": []map[string]interface{}{{
"name": p.config.ConfigMapName,
"mountPath": p.config.ConfigMountPoint,
Expand Down
25 changes: 11 additions & 14 deletions test/upgrade/prober/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ import (

testlib "knative.dev/eventing/test/lib"
watholaconfig "knative.dev/eventing/test/upgrade/prober/wathola/config"
)

var (
receiverName = "wathola-receiver"
"knative.dev/eventing/test/upgrade/prober/wathola/receiver"
)

func (p *prober) deployReceiver() {
Expand All @@ -38,22 +35,22 @@ func (p *prober) deployReceiver() {
}

func (p *prober) deployReceiverDeployment() {
p.log.Info("Deploy of receiver deployment: ", receiverName)
p.log.Info("Deploy of receiver deployment: ", receiver.Name)
deployment := p.createReceiverDeployment()
p.client.CreateDeploymentOrFail(deployment)

testlib.WaitFor(fmt.Sprint("receiver deployment be ready: ", receiverName), func() error {
testlib.WaitFor(fmt.Sprint("receiver deployment be ready: ", receiver.Name), func() error {
return pkgTest.WaitForDeploymentScale(
p.config.Ctx, p.client.Kube, receiverName, p.client.Namespace, 1,
p.config.Ctx, p.client.Kube, receiver.Name, p.client.Namespace, 1,
)
})
}

func (p *prober) deployReceiverService() {
p.log.Infof("Deploy of receiver service: %v", receiverName)
p.log.Infof("Deploy of receiver service: %v", receiver.Name)
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: receiverName,
Name: receiver.Name,
Namespace: p.client.Namespace,
},
Spec: corev1.ServiceSpec{
Expand All @@ -69,7 +66,7 @@ func (p *prober) deployReceiverService() {
},
},
Selector: map[string]string{
"app": receiverName,
"app": receiver.Name,
},
Type: corev1.ServiceTypeClusterIP,
},
Expand All @@ -81,20 +78,20 @@ func (p *prober) createReceiverDeployment() *appsv1.Deployment {
var replicas int32 = 1
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: receiverName,
Name: receiver.Name,
Namespace: p.client.Namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": receiverName,
"app": receiver.Name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": receiverName,
"app": receiver.Name,
},
},
Spec: corev1.PodSpec{
Expand All @@ -110,7 +107,7 @@ func (p *prober) createReceiverDeployment() *appsv1.Deployment {
}},
Containers: []corev1.Container{{
Name: "receiver",
Image: p.config.ImageResolver(receiverName),
Image: p.config.ImageResolver(receiver.Name),
VolumeMounts: []corev1.VolumeMount{{
Name: p.config.ConfigMapName,
ReadOnly: true,
Expand Down
Loading

0 comments on commit 3890b39

Please sign in to comment.