Skip to content

Commit

Permalink
Proper webhook registration for KafkaBroker (#2562)
Browse files Browse the repository at this point in the history
* Register webhook for KafkaBroker

* Fix wrong eventing core version

* Rename BrokerPartial to BrokerStub

* Now doing proper validation with letting upstream webhook do its job validating generic stuff and also other broker classes
  • Loading branch information
aliok authored Sep 5, 2022
1 parent 6866e84 commit e5a6438
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 100 deletions.
10 changes: 2 additions & 8 deletions control-plane/cmd/webhook-kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"knative.dev/pkg/webhook/resourcesemantics/validation"

sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"
eventingcorev1beta1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingcorev1 "knative.dev/eventing/pkg/apis/eventing/v1"

messagingv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"

Expand All @@ -49,10 +49,7 @@ var types = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
eventingv1alpha1.SchemeGroupVersion.WithKind("KafkaSink"): &eventingv1alpha1.KafkaSink{},
sourcesv1beta1.SchemeGroupVersion.WithKind("KafkaSource"): &sourcesv1beta1.KafkaSource{},
messagingv1beta1.SchemeGroupVersion.WithKind("KafkaChannel"): &messagingv1beta1.KafkaChannel{},
}

var validationCallbacks = map[schema.GroupVersionKind]validation.Callback{
eventingcorev1beta1.SchemeGroupVersion.WithKind("Broker"): eventingv1.BrokerValidationCallback(),
eventingcorev1.SchemeGroupVersion.WithKind("Broker"): &eventingv1.BrokerStub{},
}

var defaultingCallbacks = map[schema.GroupVersionKind]defaulting.Callback{
Expand Down Expand Up @@ -131,9 +128,6 @@ func NewValidationAdmissionController(ctx context.Context, _ configmap.Watcher)

// Whether to disallow unknown fields.
true,

// Extra validating callbacks to be applied to resources.
validationCallbacks,
)
}

Expand Down
87 changes: 63 additions & 24 deletions control-plane/pkg/apis/eventing/v1/broker_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,92 @@ import (
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"

eventing "knative.dev/eventing/pkg/apis/eventing/v1"

"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/webhook"
"knative.dev/pkg/webhook/resourcesemantics/validation"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/pkg/webhook/resourcesemantics"
)

type BrokerPartial struct {
type BrokerStub struct {
metav1.TypeMeta `json:",inline"`

// +optional
metav1.ObjectMeta `json:"metadata,omitempty"`
// Spec defines the desired state of the Broker.
Spec BrokerPartialSpec `json:"spec,omitempty"`

Spec eventing.BrokerSpec `json:"spec,omitempty"`

// +optional
Status eventing.BrokerStatus `json:"status,omitempty"`
}

type BrokerPartialSpec struct {
var (
// Check that Broker is resourcesemantics.GenericCRD.
// We do this to make sure the validating/defaulting webhooks can operate on our stub.
// If we don't do this stubbing, we will end up doing the defaulting/validating using the upstream Broker type.
// However, we want to let the upstream webhook to do its job and not us. We only want to default/validate
// the parts relevant to us.
_ resourcesemantics.GenericCRD = (*BrokerStub)(nil)
)

type BrokerStubSpec struct {
// Config is a KReference to the configuration that specifies
// configuration options for this Broker. For example, this could be
// a pointer to a ConfigMap.
// +optional
Config *duckv1.KReference `json:"config,omitempty"`
}

func (b BrokerPartialSpec) Validate(context.Context) error {
if b.Config == nil {
return apis.ErrMissingField("config").ViaField("spec")
func (b *BrokerStub) Validate(context.Context) *apis.FieldError {
if b.Annotations[eventing.BrokerClassAnnotationKey] != kafka.BrokerClass && b.Annotations[eventing.BrokerClassAnnotationKey] != kafka.NamespacedBrokerClass {
// validation for the broker of other classes is done by the other webhooks
return nil
}
if strings.ToLower(b.Config.Kind) != "configmap" {
return apis.ErrInvalidValue(b.Config.Kind, "kind", "Expected ConfigMap").ViaField("config").ViaField("spec")

if b.Spec.Config == nil {
// validation of the Config requiredness is done by the upstream webhook and we want to let it do its job.
// otherwise API server will say that our Kafka admission controller refused it, but it is a generic validation.
return nil
}

// we specifically expect our broker class to use ConfigMap as the config type
if strings.ToLower(b.Spec.Config.Kind) != "configmap" {
return apis.ErrInvalidValue(b.Spec.Config.Kind, "kind", "Expected ConfigMap").ViaField("config").ViaField("spec")
}

// we specifically expect for our broker classes, that there's a namespace in the config
if b.Spec.Config.Namespace == "" {
return apis.ErrMissingField(b.Spec.Config.Namespace, "namespace").ViaField("config").ViaField("spec")
}

// for the namespaced broker, we expect the config to be in the same namespace as the broker
if b.Annotations[eventing.BrokerClassAnnotationKey] == kafka.NamespacedBrokerClass && b.Spec.Config.Namespace != b.Namespace {
return apis.ErrInvalidValue(b.Spec.Config.Namespace, "namespace", "Expected ConfigMap in same namespace with broker resource").ViaField("config").ViaField("spec")
}

return nil
}

func validateBrokerFromUnstructured(ctx context.Context, unstructured *unstructured.Unstructured) error {
broker := BrokerPartial{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.UnstructuredContent(), &broker); err != nil {
return err
}
if class, ok := broker.Annotations[eventing.BrokerClassAnnotationKey]; !ok || class != kafka.BrokerClass {
func (b *BrokerStub) SetDefaults(ctx context.Context) {
// do nothing, we let the upstream webhook do its job
}

func (b *BrokerStub) DeepCopyObject() runtime.Object {
if b == nil {
return nil
}
return broker.Spec.Validate(ctx)
}

func BrokerValidationCallback() validation.Callback {
return validation.NewCallback(validateBrokerFromUnstructured, webhook.Create, webhook.Update)
out := &BrokerStub{
TypeMeta: b.TypeMeta, //simple struct
}

b.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
b.Spec.DeepCopyInto(&out.Spec)
b.Status.DeepCopyInto(&out.Status)

return out
}
170 changes: 102 additions & 68 deletions control-plane/pkg/apis/eventing/v1/broker_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,88 +20,122 @@ import (
"context"
"testing"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

func TestValidateBrokerFromUnstructured(t *testing.T) {
func TestValidate(t *testing.T) {
tests := []struct {
name string
ctx context.Context
unstructured *unstructured.Unstructured
wantErr bool
}{
{
name: "no kafka broker",
ctx: context.Background(),
unstructured: &unstructured.Unstructured{
Object: map[string]interface{}{"spec": map[string]interface{}{}},
name string
b BrokerStub
want *apis.FieldError
}{{
name: "missing annotation",
b: BrokerStub{},
}, {
name: "empty annotation",
b: BrokerStub{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"eventing.knative.dev/broker.class": ""},
},
wantErr: false,
},
{
name: "missing config",
ctx: context.Background(),
unstructured: &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]string{
"eventing.knative.dev/broker.class": "Kafka",
},
},
"spec": map[string]interface{}{},
}, {
name: "other broker class",
b: BrokerStub{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"eventing.knative.dev/broker.class": "Foo"},
},
},
}, {
name: "no spec.config",
b: BrokerStub{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"eventing.knative.dev/broker.class": "Kafka"},
},
},
}, {
name: "no spec.config.namespace",
b: BrokerStub{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"eventing.knative.dev/broker.class": "Kafka"},
},
Spec: eventingv1.BrokerSpec{
Config: &duckv1.KReference{
Kind: "ConfigMap",
},
},
wantErr: true,
},
{
name: "unknown kind",
ctx: context.Background(),
unstructured: &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]string{
"eventing.knative.dev/broker.class": "Kafka",
},
},
"spec": map[string]interface{}{
"config": map[string]string{
"apiVersion": "eventing.knative.dev/v1",
"kind": "Broker",
"namespace": "ns",
"name": "name",
},
},
want: apis.ErrMissingField("", "namespace").ViaField("config").ViaField("spec"),
}, {
name: "spec.config is not configmap",
b: BrokerStub{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"eventing.knative.dev/broker.class": "Kafka"},
},
Spec: eventingv1.BrokerSpec{
Config: &duckv1.KReference{
Kind: "Service",
Namespace: "foo",
},
},
wantErr: true,
},
{
name: "ConfigMap in config",
ctx: context.Background(),
unstructured: &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]string{
"eventing.knative.dev/broker.class": "Kafka",
},
},
"spec": map[string]interface{}{
"config": map[string]string{
"apiVersion": "v1",
"kind": "ConfigMap",
"namespace": "ns",
"name": "name",
},
},
want: apis.ErrInvalidValue("Service", "kind", "Expected ConfigMap").ViaField("config").ViaField("spec"),
}, {
name: "spec.config.namespace is different",
b: BrokerStub{
ObjectMeta: metav1.ObjectMeta{
Namespace: "my-namespace",
Annotations: map[string]string{"eventing.knative.dev/broker.class": "KafkaNamespaced"},
},
Spec: eventingv1.BrokerSpec{
Config: &duckv1.KReference{
Kind: "ConfigMap",
Namespace: "foo",
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := validateBrokerFromUnstructured(tt.ctx, tt.unstructured); (err != nil) != tt.wantErr {
t.Errorf("validateBrokerFromUnstructured() error = %v, wantErr %v", err, tt.wantErr)
want: apis.ErrInvalidValue("foo", "namespace", "Expected ConfigMap in same namespace with broker resource").ViaField("config").ViaField("spec"),
}, {
name: "valid config - namespaced broker",
b: BrokerStub{
ObjectMeta: metav1.ObjectMeta{
Namespace: "my-namespace",
Annotations: map[string]string{"eventing.knative.dev/broker.class": "KafkaNamespaced"},
},
Spec: eventingv1.BrokerSpec{
Config: &duckv1.KReference{
Namespace: "my-namespace",
Name: "name",
Kind: "ConfigMap",
APIVersion: "v1",
},
},
},
}, {
name: "valid config - regular broker",
b: BrokerStub{
ObjectMeta: metav1.ObjectMeta{
Namespace: "my-namespace",
Annotations: map[string]string{"eventing.knative.dev/broker.class": "Kafka"},
},
Spec: eventingv1.BrokerSpec{
Config: &duckv1.KReference{
Name: "name",
Namespace: "my-namespace",
Kind: "ConfigMap",
APIVersion: "v1",
},
},
},
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := test.b.Validate(context.Background())
if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" {
t.Error("Broker.Validate (-want, +got) =", diff)
}
})
}
Expand Down

0 comments on commit e5a6438

Please sign in to comment.