From 9b044d05ae400c8c34e2fc8cfa8ba167dbb9efc6 Mon Sep 17 00:00:00 2001 From: Ritwik Ranjan Date: Wed, 30 Oct 2024 15:07:27 +0000 Subject: [PATCH] feat: improve network performance test to make it reusable and easily modifiable --- network/benchmarks/netperf/.gitignore | 5 +- network/benchmarks/netperf/Makefile | 23 +- network/benchmarks/netperf/go.mod | 66 +- network/benchmarks/netperf/go.sum | 175 ++++-- network/benchmarks/netperf/launch.go | 442 ++----------- network/benchmarks/netperf/lib/outputlib.go | 82 +++ network/benchmarks/netperf/lib/testlib.go | 73 +++ network/benchmarks/netperf/lib/utilslib.go | 378 +++++++++++ .../netperf/{ => nptest}/Dockerfile | 16 +- network/benchmarks/netperf/nptest/go.mod | 3 + network/benchmarks/netperf/nptest/nptest.go | 595 ++++++------------ .../benchmarks/netperf/nptest/nptest_test.go | 36 -- .../nptest/parsers/bandwidth_parsers.go | 43 ++ .../netperf/nptest/parsers/json_parsers.go | 85 +++ .../netperf/nptest/parsers/parsers_test.go | 113 ++++ .../netperf/nptest/parsers/testdata/tcp.json | 377 +++++++++++ .../nptest/parsers/testdata/tcp_output.json | 18 + .../netperf/nptest/parsers/testdata/udp.json | 88 +++ .../nptest/parsers/testdata/udp_output.json | 18 + .../netperf/nptest/parsers/types.go | 133 ++++ network/benchmarks/netperf/nptest/tests.go | 283 +++++++++ 21 files changed, 2106 insertions(+), 946 deletions(-) create mode 100644 network/benchmarks/netperf/lib/outputlib.go create mode 100644 network/benchmarks/netperf/lib/testlib.go create mode 100644 network/benchmarks/netperf/lib/utilslib.go rename network/benchmarks/netperf/{ => nptest}/Dockerfile (87%) create mode 100644 network/benchmarks/netperf/nptest/go.mod delete mode 100644 network/benchmarks/netperf/nptest/nptest_test.go create mode 100644 network/benchmarks/netperf/nptest/parsers/bandwidth_parsers.go create mode 100644 network/benchmarks/netperf/nptest/parsers/json_parsers.go create mode 100644 network/benchmarks/netperf/nptest/parsers/parsers_test.go create mode 100644 network/benchmarks/netperf/nptest/parsers/testdata/tcp.json create mode 100644 network/benchmarks/netperf/nptest/parsers/testdata/tcp_output.json create mode 100644 network/benchmarks/netperf/nptest/parsers/testdata/udp.json create mode 100644 network/benchmarks/netperf/nptest/parsers/testdata/udp_output.json create mode 100644 network/benchmarks/netperf/nptest/parsers/types.go create mode 100644 network/benchmarks/netperf/nptest/tests.go diff --git a/network/benchmarks/netperf/.gitignore b/network/benchmarks/netperf/.gitignore index 62c19394ab..b067f9b902 100644 --- a/network/benchmarks/netperf/.gitignore +++ b/network/benchmarks/netperf/.gitignore @@ -2,6 +2,7 @@ Dockerbuild/* Dockerbuildclient/* nptests netperf-w?.yaml +result*/*.json *.csv *.jpg *.png @@ -11,4 +12,6 @@ netperf-w?.yaml *.pyc .vscode data-* - +kubeConfig +launch +AzurePublicCloud-*.json diff --git a/network/benchmarks/netperf/Makefile b/network/benchmarks/netperf/Makefile index 8ac8bd4c5f..7e1456dd0f 100644 --- a/network/benchmarks/netperf/Makefile +++ b/network/benchmarks/netperf/Makefile @@ -14,18 +14,17 @@ all: docker push launch runtests -DOCKERREPO := girishkalele/netperf-latest +repo_owner := $(shell echo $(REPO_OWNER) | tr '[:upper:]' '[:lower:]') +dockerrepo := $(if $(repo_owner),ghcr.io/$(repo_owner)/nptest,girishkalele/netperf-latest) +image_tag := $(or $(IMAGE_TAG), latest) -docker: launch - mkdir -p Dockerbuild/nptest && \ - cp -f Dockerfile Dockerbuild/ && \ - cp -f nptest/nptest.go Dockerbuild/nptest/ && \ - cp -f go.mod Dockerbuild/ && \ - cp -f go.sum Dockerbuild/ && \ - docker build -t $(DOCKERREPO) Dockerbuild/ +docker: test + mkdir -p Dockerbuild && \ + cp -rf nptest/* Dockerbuild/ && \ + docker build -t $(dockerrepo):$(image_tag) Dockerbuild/ push: docker - gcloud docker push $(DOCKERREPO) + docker push $(dockerrepo):$(image_tag) clean: @rm -f Dockerbuild/* @@ -36,6 +35,10 @@ clean: launch: launch.go go build -o launch launch.go +test: + go test ./... + cd nptest && go test ./... + # 'runtests' is the test runner target runtests: launch @echo Launching network performance tests @@ -44,4 +47,6 @@ runtests: launch cp netperf-latest.csv plotperf && cd plotperf; make plot && mv *png .. && mv *svg .. @echo Results file netperf-latest.csv and SVG/PNG graphs generated successfully +localtest: push + go run launch.go -image=$(dockerrepo):$(image_tag) -json -kubeConfig ./kubeConfig diff --git a/network/benchmarks/netperf/go.mod b/network/benchmarks/netperf/go.mod index 0e4c453f94..0a5620da12 100644 --- a/network/benchmarks/netperf/go.mod +++ b/network/benchmarks/netperf/go.mod @@ -1,37 +1,47 @@ -module k8s.io/perf-tests/network +module k8s.io/perf-tests/network/benchmarks/netperf -go 1.22.4 +go 1.22.7 require ( - k8s.io/api v0.0.0-20181011064954-26c7a45db378 - k8s.io/apimachinery v0.0.0-20181011064652-56cf97ad69c7 - k8s.io/client-go v0.0.0-20181011105049-9b03088ac34f + k8s.io/api v0.30.3 + k8s.io/apimachinery v0.30.3 + k8s.io/client-go v0.30.3 ) require ( - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/ghodss/yaml v1.0.1-0.20180820084758-c7ce16629ff4 // indirect - github.com/gogo/protobuf v1.1.2-0.20181010092945-fd322a3c4963 // indirect - github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect - github.com/golang/protobuf v1.3.1 // indirect - github.com/google/btree v1.0.0 // indirect - github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect - github.com/googleapis/gnostic v0.2.3-0.20180520015035-48a0ecefe2e4 // indirect - github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect - github.com/imdario/mergo v0.3.6 // indirect - github.com/json-iterator/go v1.1.6-0.20180914014843-2433035e5132 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/emicklei/go-restful/v3 v3.12.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-openapi/jsonpointer v0.21.0 // indirect + github.com/go-openapi/jsonreference v0.21.0 // indirect + github.com/go-openapi/swag v0.23.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/gofuzz v1.2.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/imdario/mergo v0.3.16 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect - github.com/peterbourgon/diskv v2.0.2-0.20180312054125-0646ccaebea1+incompatible // indirect - github.com/spf13/pflag v1.0.3 // indirect - github.com/stretchr/testify v1.5.1 // indirect - golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 // indirect - golang.org/x/net v0.0.0-20190603091049-60506f45cf65 // indirect - golang.org/x/oauth2 v0.0.0-20160902055913-3c3a985cb79f // indirect - golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a // indirect - golang.org/x/text v0.3.2 // indirect - golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect - google.golang.org/appengine v1.6.6 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/oauth2 v0.23.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/term v0.24.0 // indirect + golang.org/x/text v0.18.0 // indirect + golang.org/x/time v0.6.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/yaml.v2 v2.2.2 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/kube-openapi v0.0.0-20240903163716-9e1beecbcb38 // indirect + k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect ) diff --git a/network/benchmarks/netperf/go.sum b/network/benchmarks/netperf/go.sum index 1d6b2059cf..0b27ec0e91 100644 --- a/network/benchmarks/netperf/go.sum +++ b/network/benchmarks/netperf/go.sum @@ -1,64 +1,139 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/ghodss/yaml v1.0.1-0.20180820084758-c7ce16629ff4 h1:PaTU+9BARuIOAz1ixvps39DJjfq/SxOj3axzIlh7nFo= -github.com/ghodss/yaml v1.0.1-0.20180820084758-c7ce16629ff4/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= -github.com/gogo/protobuf v1.1.2-0.20181010092945-fd322a3c4963 h1:qxvPMOEHQtKCpBDHjx3NiCnM3NfMEyeA9c72+IljOP4= -github.com/gogo/protobuf v1.1.2-0.20181010092945-fd322a3c4963/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= -github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= -github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= -github.com/googleapis/gnostic v0.2.3-0.20180520015035-48a0ecefe2e4 h1:Z09Qt6AGDtg0cC/YgnX/iymzIqmZf5aasP5JZFxmkNQ= -github.com/googleapis/gnostic v0.2.3-0.20180520015035-48a0ecefe2e4/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= -github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM= -github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= -github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= -github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= -github.com/json-iterator/go v1.1.6-0.20180914014843-2433035e5132 h1:Y1mRUIuPBTUZfUBQW0V9iItFGJnksjotNXSOzTtxfIo= -github.com/json-iterator/go v1.1.6-0.20180914014843-2433035e5132/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= +github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= +github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= +github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= +github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= +github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= +github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= +github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k= +github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= +github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= -github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/peterbourgon/diskv v2.0.2-0.20180312054125-0646ccaebea1+incompatible h1:FhnA4iH8T/yYW+AolPONZjGE897wxj3MAzfEbrZkSYw= -github.com/peterbourgon/diskv v2.0.2-0.20180312054125-0646ccaebea1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= +github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= -github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ= -golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= -golang.org/x/oauth2 v0.0.0-20160902055913-3c3a985cb79f h1:VWt05OS3Al9w09GSPgltoHP90whAHlpik/Bys7HVEDE= -golang.org/x/oauth2 v0.0.0-20160902055913-3c3a985cb79f/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= +golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= +golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM= -golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= +golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc= -google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -k8s.io/api v0.0.0-20181011064954-26c7a45db378 h1:lBCdUUoFs8PEMxhQoSNnVXxpKcrNrzuOw4/ofBd+0do= -k8s.io/api v0.0.0-20181011064954-26c7a45db378/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA= -k8s.io/apimachinery v0.0.0-20181011064652-56cf97ad69c7 h1:l1WAFAYv7ba/kpouP/gXH+l3ZqsRKbGH8F1VMDYmIbM= -k8s.io/apimachinery v0.0.0-20181011064652-56cf97ad69c7/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0= -k8s.io/client-go v0.0.0-20181011105049-9b03088ac34f h1:+izcn/RUZFQiB2q/wTE/xgLViQ5iFIZhlQParzg2ha4= -k8s.io/client-go v0.0.0-20181011105049-9b03088ac34f/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/api v0.30.3 h1:ImHwK9DCsPA9uoU3rVh4QHAHHK5dTSv1nxJUapx8hoQ= +k8s.io/api v0.30.3/go.mod h1:GPc8jlzoe5JG3pb0KJCSLX5oAFIW3/qNJITlDj8BH04= +k8s.io/apimachinery v0.30.3 h1:q1laaWCmrszyQuSQCfNB8cFgCuDAoPszKY4ucAjDwHc= +k8s.io/apimachinery v0.30.3/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= +k8s.io/client-go v0.30.3 h1:bHrJu3xQZNXIi8/MoxYtZBBWQQXwy16zqJwloXXfD3k= +k8s.io/client-go v0.30.3/go.mod h1:8d4pf8vYu665/kUbsxWAQ/JDBNWqfFeZnvFiVdmx89U= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/kube-openapi v0.0.0-20240903163716-9e1beecbcb38 h1:1dWzkmJrrprYvjGwh9kEUxmcUV/CtNU8QM7h1FLWQOo= +k8s.io/kube-openapi v0.0.0-20240903163716-9e1beecbcb38/go.mod h1:coRQXBK9NxO98XUv3ZD6AK3xzHCxV6+b7lrquKwaKzA= +k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 h1:MDF6h2H/h4tbzmtIKTuctcwZmY0tY9mD9fNT47QO6HI= +k8s.io/utils v0.0.0-20240921022957-49e7df575cb6/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= +sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/network/benchmarks/netperf/launch.go b/network/benchmarks/netperf/launch.go index 1c1d847a0e..b2b8c903a8 100644 --- a/network/benchmarks/netperf/launch.go +++ b/network/benchmarks/netperf/launch.go @@ -30,50 +30,44 @@ import ( "flag" "fmt" "os" - "strings" - "time" api "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" + "k8s.io/perf-tests/network/benchmarks/netperf/lib" ) const ( - csvDataMarker = "GENERATING CSV OUTPUT" - csvEndDataMarker = "END CSV DATA" - runUUID = "latest" - orchestratorPort = 5202 - iperf3Port = 5201 - qperf19766 = 19766 - qperf19765 = 19765 - netperfPort = 12865 + csvDataMarker = "GENERATING CSV OUTPUT" + csvEndDataMarker = "END CSV DATA" + jsonDataMarker = "GENERATING JSON OUTPUT" + jsonEndDataMarker = "END JSON OUTPUT" + runUUID = "latest" + orchestratorPort = 5202 + iperf3Port = 5201 + qperf19766 = 19766 + qperf19765 = 19765 + netperfPort = 12865 ) var ( - iterations int - hostnetworking bool - tag string - kubeConfig string - testNamespace string - netperfImage string - cleanupOnly bool - - everythingSelector metav1.ListOptions = metav1.ListOptions{} + iterations int + tag string + kubeConfig string + testNamespace string + netperfImage string + cleanupOnly bool primaryNode api.Node secondaryNode api.Node testFrom, testTo int + + jsonOutput bool ) func init() { - flag.BoolVar(&hostnetworking, "hostnetworking", false, - "(boolean) Enable Host Networking Mode for PODs") flag.IntVar(&iterations, "iterations", 1, "Number of iterations to run") - flag.StringVar(&tag, "tag", runUUID, "CSV file suffix") + flag.StringVar(&tag, "tag", runUUID, "Result file suffix") flag.StringVar(&netperfImage, "image", "sirot/netperf-latest", "Docker image used to run the network tests") flag.StringVar(&testNamespace, "namespace", "netperf", "Test namespace to run netperf pods") defaultKubeConfig := fmt.Sprintf("%s/.kube/config", os.Getenv("HOME")) @@ -83,361 +77,7 @@ func init() { "(boolean) Run the cleanup resources phase only (use this flag to clean up orphaned resources from a test run)") flag.IntVar(&testFrom, "testFrom", 0, "start from test number testFrom") flag.IntVar(&testTo, "testTo", 5, "end at test number testTo") -} - -func setupClient() *kubernetes.Clientset { - config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) - if err != nil { - panic(err) - } - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - panic(err) - } - - return clientset -} - -// getMinions : Only return schedulable/worker nodes -func getMinionNodes(c *kubernetes.Clientset) *api.NodeList { - nodes, err := c.CoreV1().Nodes().List( - metav1.ListOptions{ - FieldSelector: "spec.unschedulable=false", - }) - if err != nil { - fmt.Println("Failed to fetch nodes", err) - return nil - } - return nodes -} - -func cleanup(c *kubernetes.Clientset) { - // Cleanup existing rcs, pods and services in our namespace - rcs, err := c.CoreV1().ReplicationControllers(testNamespace).List(everythingSelector) - if err != nil { - fmt.Println("Failed to get replication controllers", err) - return - } - for _, rc := range rcs.Items { - fmt.Println("Deleting rc", rc.GetName()) - if err := c.CoreV1().ReplicationControllers(testNamespace).Delete( - rc.GetName(), &metav1.DeleteOptions{}); err != nil { - fmt.Println("Failed to delete rc", rc.GetName(), err) - } - } - pods, err := c.CoreV1().Pods(testNamespace).List(everythingSelector) - if err != nil { - fmt.Println("Failed to get pods", err) - return - } - for _, pod := range pods.Items { - fmt.Println("Deleting pod", pod.GetName()) - if err := c.CoreV1().Pods(testNamespace).Delete(pod.GetName(), &metav1.DeleteOptions{GracePeriodSeconds: new(int64)}); err != nil { - fmt.Println("Failed to delete pod", pod.GetName(), err) - } - } - svcs, err := c.CoreV1().Services(testNamespace).List(everythingSelector) - if err != nil { - fmt.Println("Failed to get services", err) - return - } - for _, svc := range svcs.Items { - fmt.Println("Deleting svc", svc.GetName()) - err := c.CoreV1().Services(testNamespace).Delete( - svc.GetName(), &metav1.DeleteOptions{}) - if err != nil { - fmt.Println("Failed to get service", err) - } - } -} - -// createServices: Long-winded function to programmatically create our two services -func createServices(c *kubernetes.Clientset) bool { - // Create our namespace if not present - if _, err := c.CoreV1().Namespaces().Get(testNamespace, metav1.GetOptions{}); err != nil { - _, err := c.CoreV1().Namespaces().Create(&api.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}) - if err != nil { - fmt.Println("Failed to create service", err) - } - } - - // Create the orchestrator service that points to the coordinator pod - orchLabels := map[string]string{"app": "netperf-orch"} - orchService := &api.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "netperf-orch", - }, - Spec: api.ServiceSpec{ - Selector: orchLabels, - Ports: []api.ServicePort{{ - Name: "netperf-orch", - Protocol: api.ProtocolTCP, - Port: orchestratorPort, - TargetPort: intstr.FromInt(orchestratorPort), - }}, - Type: api.ServiceTypeClusterIP, - }, - } - if _, err := c.CoreV1().Services(testNamespace).Create(orchService); err != nil { - fmt.Println("Failed to create orchestrator service", err) - return false - } - fmt.Println("Created orchestrator service") - - // Create the netperf-w2 service that points a clusterIP at the worker 2 pod - netperfW2Labels := map[string]string{"app": "netperf-w2"} - netperfW2Service := &api.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "netperf-w2", - }, - Spec: api.ServiceSpec{ - Selector: netperfW2Labels, - Ports: []api.ServicePort{ - { - Name: "netperf-w2", - Protocol: api.ProtocolTCP, - Port: iperf3Port, - TargetPort: intstr.FromInt(iperf3Port), - }, - { - Name: "netperf-w2-qperf19766", - Protocol: api.ProtocolTCP, - Port: qperf19766, - TargetPort: intstr.FromInt(qperf19766), - }, - { - Name: "netperf-w2-qperf19765", - Protocol: api.ProtocolTCP, - Port: qperf19765, - TargetPort: intstr.FromInt(qperf19765), - }, - { - Name: "netperf-w2-sctp", - Protocol: api.ProtocolSCTP, - Port: iperf3Port, - TargetPort: intstr.FromInt(iperf3Port), - }, - { - Name: "netperf-w2-udp", - Protocol: api.ProtocolUDP, - Port: iperf3Port, - TargetPort: intstr.FromInt(iperf3Port), - }, - { - Name: "netperf-w2-netperf", - Protocol: api.ProtocolTCP, - Port: netperfPort, - TargetPort: intstr.FromInt(netperfPort), - }, - }, - Type: api.ServiceTypeClusterIP, - }, - } - if _, err := c.CoreV1().Services(testNamespace).Create(netperfW2Service); err != nil { - fmt.Println("Failed to create netperf-w2 service", err) - return false - } - fmt.Println("Created netperf-w2 service") - return true -} - -// createRCs - Create replication controllers for all workers and the orchestrator -func createRCs(c *kubernetes.Clientset) bool { - // Create the orchestrator RC - name := "netperf-orch" - fmt.Println("Creating replication controller", name) - replicas := int32(1) - - _, err := c.CoreV1().ReplicationControllers(testNamespace).Create(&api.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Spec: api.ReplicationControllerSpec{ - Replicas: &replicas, - Selector: map[string]string{"app": name}, - Template: &api.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"app": name}, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: name, - Image: netperfImage, - Ports: []api.ContainerPort{{ContainerPort: orchestratorPort}}, - Args: []string{ - "--mode=orchestrator", - fmt.Sprintf("--testFrom=%d", testFrom), - fmt.Sprintf("--testTo=%d", testTo), - }, - ImagePullPolicy: "Always", - }, - }, - TerminationGracePeriodSeconds: new(int64), - }, - }, - }, - }) - if err != nil { - fmt.Println("Error creating orchestrator replication controller", err) - return false - } - fmt.Println("Created orchestrator replication controller") - for i := 1; i <= 3; i++ { - // Bring up pods slowly - time.Sleep(3 * time.Second) - kubeNode := primaryNode.GetName() - if i == 3 { - kubeNode = secondaryNode.GetName() - } - name = fmt.Sprintf("netperf-w%d", i) - fmt.Println("Creating replication controller", name) - portSpec := []api.ContainerPort{} - if i > 1 { - // Worker W1 is a client-only pod - no ports are exposed - portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolTCP}) - portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolSCTP}) - } - - workerEnv := []api.EnvVar{ - {Name: "worker", Value: name}, - {Name: "kubeNode", Value: kubeNode}, - {Name: "podname", Value: name}, - } - - replicas := int32(1) - - _, err := c.CoreV1().ReplicationControllers(testNamespace).Create(&api.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Spec: api.ReplicationControllerSpec{ - Replicas: &replicas, - Selector: map[string]string{"app": name}, - Template: &api.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"app": name}, - }, - Spec: api.PodSpec{ - NodeName: kubeNode, - Containers: []api.Container{ - { - Name: name, - Image: netperfImage, - Ports: portSpec, - Args: []string{"--mode=worker"}, - Env: workerEnv, - ImagePullPolicy: "Always", - }, - }, - TerminationGracePeriodSeconds: new(int64), - }, - }, - }, - }) - if err != nil { - fmt.Println("Error creating orchestrator replication controller", name, ":", err) - return false - } - } - - return true -} - -func getOrchestratorPodName(pods *api.PodList) string { - for _, pod := range pods.Items { - if strings.Contains(pod.GetName(), "netperf-orch-") { - return pod.GetName() - } - } - return "" -} - -// Retrieve the logs for the pod/container and check if csv data has been generated -func getCsvResultsFromPod(c *kubernetes.Clientset, podName string) *string { - body, err := c.CoreV1().Pods(testNamespace).GetLogs(podName, &api.PodLogOptions{Timestamps: false}).DoRaw() - if err != nil { - fmt.Printf("Error (%s) reading logs from pod %s", err, podName) - return nil - } - logData := string(body) - index := strings.Index(logData, csvDataMarker) - endIndex := strings.Index(logData, csvEndDataMarker) - if index == -1 || endIndex == -1 { - return nil - } - csvData := string(body[index+len(csvDataMarker)+1 : endIndex]) - return &csvData -} - -// processCsvData : Process the CSV datafile and generate line and bar graphs -func processCsvData(csvData *string) bool { - t := time.Now().UTC() - outputFileDirectory := fmt.Sprintf("results_%s-%s", testNamespace, tag) - outputFilePrefix := fmt.Sprintf("%s-%s_%s.", testNamespace, tag, t.Format("20060102150405")) - fmt.Printf("Test concluded - CSV raw data written to %s/%scsv\n", outputFileDirectory, outputFilePrefix) - if _, err := os.Stat(outputFileDirectory); os.IsNotExist(err) { - err := os.Mkdir(outputFileDirectory, 0766) - if err != nil { - fmt.Println("Error creating directory", err) - return false - } - - } - fd, err := os.OpenFile(fmt.Sprintf("%s/%scsv", outputFileDirectory, outputFilePrefix), os.O_RDWR|os.O_CREATE, 0666) - if err != nil { - fmt.Println("ERROR writing output CSV datafile", err) - return false - } - _, err = fd.WriteString(*csvData) - if err != nil { - fmt.Println("Error writing string", err) - return false - } - fd.Close() - return true -} - -func executeTests(c *kubernetes.Clientset) bool { - for i := 0; i < iterations; i++ { - cleanup(c) - if !createServices(c) { - fmt.Println("Failed to create services - aborting test") - return false - } - time.Sleep(3 * time.Second) - if !createRCs(c) { - fmt.Println("Failed to create replication controllers - aborting test") - return false - } - fmt.Println("Waiting for netperf pods to start up") - - var orchestratorPodName string - for len(orchestratorPodName) == 0 { - fmt.Println("Waiting for orchestrator pod creation") - time.Sleep(60 * time.Second) - var pods *api.PodList - var err error - if pods, err = c.CoreV1().Pods(testNamespace).List(everythingSelector); err != nil { - fmt.Println("Failed to fetch pods - waiting for pod creation", err) - continue - } - orchestratorPodName = getOrchestratorPodName(pods) - } - fmt.Println("Orchestrator Pod is", orchestratorPodName) - - // The pods orchestrate themselves, we just wait for the results file to show up in the orchestrator container - for { - // Monitor the orchestrator pod for the CSV results file - csvdata := getCsvResultsFromPod(c, orchestratorPodName) - if csvdata == nil { - fmt.Println("Scanned orchestrator pod filesystem - no results file found yet...waiting for orchestrator to write CSV file...") - time.Sleep(60 * time.Second) - continue - } - if processCsvData(csvdata) { - break - } - } - fmt.Printf("TEST RUN (Iteration %d) FINISHED - cleaning up services and pods\n", i) - } - return false + flag.BoolVar(&jsonOutput, "json", false, "Output JSON data along with CSV data") } func main() { @@ -445,31 +85,29 @@ func main() { fmt.Println("Network Performance Test") fmt.Println("Parameters :") fmt.Println("Iterations : ", iterations) - fmt.Println("Host Networking : ", hostnetworking) fmt.Println("Test Namespace : ", testNamespace) fmt.Println("Docker image : ", netperfImage) fmt.Println("------------------------------------------------------------") - var c *kubernetes.Clientset - if c = setupClient(); c == nil { - fmt.Println("Failed to setup REST client to Kubernetes cluster") - return - } - if cleanupOnly { - cleanup(c) - return - } - nodes := getMinionNodes(c) - if nodes == nil { - return + testParams := lib.TestParams{ + Iterations: iterations, + Tag: tag, + TestNamespace: testNamespace, + Image: netperfImage, + CleanupOnly: cleanupOnly, + TestFrom: testFrom, + TestTo: testTo, + JsonOutput: jsonOutput, + KubeConfig: kubeConfig, + } + results, err := lib.PerformTests(testParams) + if err != nil { + fmt.Println(err) + os.Exit(1) } - if len(nodes.Items) < 2 { - fmt.Println("Insufficient number of nodes for test (need minimum 2 nodes)") - return + fmt.Println("Results :") + for _, result := range results { + fmt.Println("CSV Result File : ", result.CsvResultFile) + fmt.Println("JSON Result File : ", result.JsonResultFile) } - primaryNode = nodes.Items[0] - secondaryNode = nodes.Items[1] - fmt.Printf("Selected primary,secondary nodes = (%s, %s)\n", primaryNode.GetName(), secondaryNode.GetName()) - executeTests(c) - cleanup(c) } diff --git a/network/benchmarks/netperf/lib/outputlib.go b/network/benchmarks/netperf/lib/outputlib.go new file mode 100644 index 0000000000..c010654946 --- /dev/null +++ b/network/benchmarks/netperf/lib/outputlib.go @@ -0,0 +1,82 @@ +package lib + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + api "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" +) + +func getLogsFromPod(c *kubernetes.Clientset, podName, testNamespace string) (*string, error) { + var logData *string + + // Retry to get logs from the pod, as we are polling at intervals + // and there might be intermittent network issues, a long retry time + // is acceptable. + err := retry.OnError(wait.Backoff{ + Steps: 5, + Duration: 2 * time.Second, + Factor: 2.0, + Jitter: 100, + }, func(err error) bool { + return true + }, func() error { + body, err := c.CoreV1().Pods(testNamespace).GetLogs(podName, &api.PodLogOptions{}).DoRaw(context.Background()) + if err != nil { + return err + } + data := string(body) + logData = &data + return nil + }) + + if err != nil { + return nil, fmt.Errorf("error reading logs from pod %s: %v", podName, err) + } + + return logData, nil +} + +func getDataFromPod(c *kubernetes.Clientset, podName, startMarker, endMarker, testNamespace string) (*string, error) { + logData, err := getLogsFromPod(c, podName, testNamespace) + if err != nil { + return nil, err + } + index := strings.Index(*logData, startMarker) + endIndex := strings.Index(*logData, endMarker) + if index == -1 || endIndex == -1 { + return nil, nil + } + data := string((*logData)[index+len(startMarker)+1 : endIndex]) + return &data, nil +} + +func processRawData(rawData *string, testNamespace, tag, fileExtension string) (string, error) { + t := time.Now().UTC() + outputFileDirectory := fmt.Sprintf("results_%s-%s", testNamespace, tag) + outputFilePrefix := fmt.Sprintf("%s-%s_%s.", testNamespace, tag, t.Format("20060102150405")) + outputFilePath := fmt.Sprintf("%s/%s%s", outputFileDirectory, outputFilePrefix, fileExtension) + fmt.Printf("Test concluded - Raw data written to %s\n", outputFilePath) + if _, err := os.Stat(outputFileDirectory); os.IsNotExist(err) { + err := os.Mkdir(outputFileDirectory, 0766) + if err != nil { + return "", err + } + } + fd, err := os.OpenFile(outputFilePath, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + return "", fmt.Errorf("ERROR writing output datafile: %s", err) + } + defer fd.Close() + _, err = fd.WriteString(*rawData) + if err != nil { + return "", fmt.Errorf("error writing string: %s", err) + } + return outputFilePath, nil +} diff --git a/network/benchmarks/netperf/lib/testlib.go b/network/benchmarks/netperf/lib/testlib.go new file mode 100644 index 0000000000..8383bc9c5e --- /dev/null +++ b/network/benchmarks/netperf/lib/testlib.go @@ -0,0 +1,73 @@ +package lib + +import ( + "fmt" +) + +const ( + csvDataMarker = "GENERATING CSV OUTPUT" + csvEndDataMarker = "END CSV DATA" + jsonDataMarker = "GENERATING JSON OUTPUT" + jsonEndDataMarker = "END JSON OUTPUT" + runUUID = "latest" + orchestratorPort = 5202 + iperf3Port = 5201 + qperf19766 = 19766 + qperf19765 = 19765 + netperfPort = 12865 +) + +type TestParams struct { + Iterations int + Tag string + TestNamespace string + Image string + CleanupOnly bool + TestFrom int + TestTo int + JsonOutput bool + KubeConfig string +} + +type Result struct { + JsonResultFile string + CsvResultFile string +} + +func PerformTests(testParams TestParams) ([]Result, error) { + c, err := setupClient(testParams.KubeConfig) + if err != nil { + return nil, fmt.Errorf("failed to create clientset: %v", err) + } + nodes, err := getMinionNodes(c) + if err != nil { + return nil, fmt.Errorf("failed to get nodes: %v", err) + } + if len(nodes.Items) < 2 { + return nil, fmt.Errorf("at least 2 nodes are required to run the tests") + } + primaryNode := nodes.Items[0] + secondaryNode := nodes.Items[1] + + fmt.Println("Primary Node : ", primaryNode.Name) + fmt.Println("Secondary Node : ", secondaryNode.Name) + + if testParams.CleanupOnly { + cleanup(c, testParams.TestNamespace) + return nil, nil + } + + fmt.Println("Network Performance Test") + fmt.Println("Parameters :") + fmt.Println("Iterations : ", testParams.Iterations) + fmt.Println("Test Namespace : ", testParams.TestNamespace) + fmt.Println("Docker image : ", testParams.Image) + fmt.Println("------------------------------------------------------------") + + results, err := executeTests(c, testParams, primaryNode, secondaryNode) + if err != nil { + return nil, fmt.Errorf("failed to execute tests: %v", err) + } + cleanup(c, testParams.TestNamespace) + return results, nil +} diff --git a/network/benchmarks/netperf/lib/utilslib.go b/network/benchmarks/netperf/lib/utilslib.go new file mode 100644 index 0000000000..60e101d5ed --- /dev/null +++ b/network/benchmarks/netperf/lib/utilslib.go @@ -0,0 +1,378 @@ +package lib + +import ( + "context" + "fmt" + "time" + + api "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +var everythingSelector metav1.ListOptions = metav1.ListOptions{} + +func setupClient(kubeConfig string) (*kubernetes.Clientset, error) { + config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) + if err != nil { + return nil, fmt.Errorf("failed to create config: %v", err) + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to create clientset: %v", err) + } + + return clientset, nil +} + +func getMinionNodes(c *kubernetes.Clientset) (*api.NodeList, error) { + nodes, err := c.CoreV1().Nodes().List( + context.Background(), + metav1.ListOptions{ + FieldSelector: "spec.unschedulable=false", + // for now the tests can only run on linux/amd64 nodes + LabelSelector: "kubernetes.io/os=linux,kubernetes.io/arch=amd64", + }) + if err != nil { + return nil, fmt.Errorf("failed to get nodes: %v", err) + } + return nodes, nil +} + +func createServices(c *kubernetes.Clientset, testNamespace string) error { + if _, err := c.CoreV1().Namespaces().Get(context.Background(), testNamespace, metav1.GetOptions{}); err != nil { + _, err := c.CoreV1().Namespaces().Create(context.Background(), &api.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create namespace %s: %v", testNamespace, err) + } + } + + // Create the orchestrator service that points to the coordinator pod + orchLabels := map[string]string{"app": "netperf-orch"} + orchService := &api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "netperf-orch", + }, + Spec: api.ServiceSpec{ + Selector: orchLabels, + Ports: []api.ServicePort{{ + Name: "netperf-orch", + Protocol: api.ProtocolTCP, + Port: orchestratorPort, + TargetPort: intstr.FromInt(orchestratorPort), + }}, + Type: api.ServiceTypeClusterIP, + }, + } + if _, err := c.CoreV1().Services(testNamespace).Create(context.Background(), orchService, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create orchestrator service: %v", err) + } + fmt.Println("Created orchestrator service") + + // Create the netperf-w2 service that points a clusterIP at the worker 2 pod + netperfW2Labels := map[string]string{"app": "netperf-w2"} + netperfW2Service := &api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "netperf-w2", + }, + Spec: api.ServiceSpec{ + Selector: netperfW2Labels, + Ports: []api.ServicePort{ + { + Name: "netperf-w2", + Protocol: api.ProtocolTCP, + Port: iperf3Port, + TargetPort: intstr.FromInt(iperf3Port), + }, + { + Name: "netperf-w2-qperf19766", + Protocol: api.ProtocolTCP, + Port: qperf19766, + TargetPort: intstr.FromInt(qperf19766), + }, + { + Name: "netperf-w2-qperf19765", + Protocol: api.ProtocolTCP, + Port: qperf19765, + TargetPort: intstr.FromInt(qperf19765), + }, + { + Name: "netperf-w2-sctp", + Protocol: api.ProtocolSCTP, + Port: iperf3Port, + TargetPort: intstr.FromInt(iperf3Port), + }, + { + Name: "netperf-w2-udp", + Protocol: api.ProtocolUDP, + Port: iperf3Port, + TargetPort: intstr.FromInt(iperf3Port), + }, + { + Name: "netperf-w2-netperf", + Protocol: api.ProtocolTCP, + Port: netperfPort, + TargetPort: intstr.FromInt(netperfPort), + }, + }, + Type: api.ServiceTypeClusterIP, + }, + } + if _, err := c.CoreV1().Services(testNamespace).Create(context.Background(), netperfW2Service, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create netperf-w2 service: %v", err) + } + fmt.Println("Created netperf-w2 service") + return nil +} + +func createRCs(c *kubernetes.Clientset, testParams TestParams, primaryNode, secondaryNode api.Node) error { + // Create the orchestrator RC + name := "netperf-orch" + fmt.Println("Creating replication controller", name) + replicas := int32(1) + + _, err := c.CoreV1().ReplicationControllers(testParams.TestNamespace).Create(context.Background(), &api.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: api.ReplicationControllerSpec{ + Replicas: &replicas, + Selector: map[string]string{"app": name}, + Template: &api.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": name}, + }, + Spec: api.PodSpec{ + NodeSelector: map[string]string{"kubernetes.io/os": "linux", "kubernetes.io/arch": "amd64"}, + Containers: []api.Container{ + { + Name: name, + Image: testParams.Image, + Ports: []api.ContainerPort{{ContainerPort: orchestratorPort}}, + Args: []string{ + "--mode=orchestrator", + fmt.Sprintf("--testFrom=%d", testParams.TestFrom), + fmt.Sprintf("--testTo=%d", testParams.TestTo), + }, + ImagePullPolicy: "Always", + }, + }, + TerminationGracePeriodSeconds: new(int64), + }, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("error creating orchestrator replication controller %s: %v", name, err) + } + fmt.Println("Created orchestrator replication controller") + for i := 1; i <= 3; i++ { + // Bring up pods slowly + time.Sleep(3 * time.Second) + kubeNode := primaryNode.GetName() + if i == 3 { + kubeNode = secondaryNode.GetName() + } + name = fmt.Sprintf("netperf-w%d", i) + fmt.Println("Creating replication controller", name) + portSpec := []api.ContainerPort{} + if i > 1 { + // Worker W1 is a client-only pod - no ports are exposed + portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolTCP}) + portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolSCTP}) + } + + workerEnv := []api.EnvVar{ + {Name: "worker", Value: name}, + {Name: "kubeNode", Value: kubeNode}, + {Name: "podname", Value: name}, + } + + replicas := int32(1) + + _, err := c.CoreV1().ReplicationControllers(testParams.TestNamespace).Create(context.Background(), &api.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: api.ReplicationControllerSpec{ + Replicas: &replicas, + Selector: map[string]string{"app": name}, + Template: &api.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": name}, + }, + Spec: api.PodSpec{ + NodeName: kubeNode, + Containers: []api.Container{ + { + Name: name, + Image: testParams.Image, + Ports: portSpec, + Args: []string{ + "--mode=worker", + fmt.Sprintf("--testFrom=%d", testParams.TestFrom), + fmt.Sprintf("--testTo=%d", testParams.TestTo), + }, + Env: workerEnv, + ImagePullPolicy: "Always", + }, + }, + TerminationGracePeriodSeconds: new(int64), + }, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("error creating worker replication controller %s: %v", name, err) + } + } + + return nil +} + +func executeTests(c *kubernetes.Clientset, testParams TestParams, primaryNode, secondaryNode api.Node) ([]Result, error) { + results := make([]Result, testParams.Iterations) + for i := 0; i < testParams.Iterations; i++ { + cleanup(c, testParams.TestNamespace) + if err := createServices(c, testParams.TestNamespace); err != nil { + return nil, fmt.Errorf("failed to create services: %v", err) + } + time.Sleep(3 * time.Second) + if err := createRCs(c, testParams, primaryNode, secondaryNode); err != nil { + return nil, fmt.Errorf("failed to create replication controllers: %v", err) + } + fmt.Println("Waiting for netperf pods to start up") + + orchestratorPodName, err := getOrchestratorPodName(c, testParams.TestNamespace, 3*time.Minute) + if err != nil { + return nil, fmt.Errorf("failed to get orchestrator pod name: %v", err) + } + fmt.Println("Orchestrator Pod is", orchestratorPodName) + + var jsonFilePath string + var csvFilePath string + + // The pods orchestrate themselves, we just wait for the results file to show up in the orchestrator container + for { + // Monitor the orchestrator pod for the CSV results file + csvdata, err := getDataFromPod(c, orchestratorPodName, csvDataMarker, csvEndDataMarker, testParams.TestNamespace) + if err != nil { + return nil, fmt.Errorf("error getting CSV data from orchestrator pod: %v", err) + } + if csvdata == nil { + fmt.Println("Scanned orchestrator pod filesystem - no results file found yet...") + time.Sleep(60 * time.Second) + continue + } + + if testParams.JsonOutput { + jsondata, err := getDataFromPod(c, orchestratorPodName, jsonDataMarker, jsonEndDataMarker, testParams.TestNamespace) + if err != nil { + return nil, fmt.Errorf("error getting JSON data from orchestrator pod: %v", err) + } + if jsondata == nil { + fmt.Println("Scanned orchestrator pod filesystem - no json data found yet...") + time.Sleep(60 * time.Second) + continue + } + jsonFilePath, err = processRawData(jsondata, testParams.TestNamespace, testParams.Tag, "json") + if err != nil { + return nil, fmt.Errorf("error processing JSON data: %v", err) + } + } + + csvFilePath, err = processRawData(csvdata, testParams.TestNamespace, testParams.Tag, "csv") + if err != nil { + return nil, fmt.Errorf("error processing CSV data: %v", err) + } + + break + } + fmt.Printf("TEST RUN (Iteration %d) FINISHED - cleaning up services and pods\n", i) + results[i] = Result{JsonResultFile: jsonFilePath, CsvResultFile: csvFilePath} + } + return results, nil +} + +func getOrchestratorPodName(c *kubernetes.Clientset, testNamespace string, timeout time.Duration) (string, error) { + timeoutCh := time.After(timeout) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + fmt.Println("Waiting for orchestrator pod creation") + pods, err := c.CoreV1().Pods(testNamespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=netperf-orch", + }) + if err != nil { + fmt.Println("Failed to fetch pods - waiting for pod creation", err) + continue + } + if len(pods.Items) == 0 { + fmt.Println("No orchestrator pods found yet") + continue + } + + pod := pods.Items[0] + podStatus := pod.Status + + if podStatus.Phase == api.PodRunning { + return pod.GetName(), nil + } + + for _, containerStatus := range podStatus.ContainerStatuses { + if waiting := containerStatus.State.Waiting; waiting != nil { + switch waiting.Reason { + case "ErrImagePull", "CrashLoopBackOff", "ImagePullBackOff": + return "", fmt.Errorf("orchestrator pod error: %s - %v", waiting.Reason, waiting.Message) + } + } + } + fmt.Println("Orchestrator pod is not running yet") + case <-timeoutCh: + return "", fmt.Errorf("timed out waiting for orchestrator pod to be created") + } + } +} + +func cleanup(c *kubernetes.Clientset, testNamespace string) { + syncCtx := context.Background() + // Cleanup existing rcs, pods and services in our namespace + rcs, err := c.CoreV1().ReplicationControllers(testNamespace).List(syncCtx, everythingSelector) + if err != nil { + fmt.Println("Failed to get replication controllers", err) + return + } + for _, rc := range rcs.Items { + fmt.Println("Deleting rc", rc.GetName()) + if err := c.CoreV1().ReplicationControllers(testNamespace).Delete( + context.Background(), + rc.GetName(), metav1.DeleteOptions{}); err != nil { + fmt.Println("Failed to delete rc", rc.GetName(), err) + } + } + pods, err := c.CoreV1().Pods(testNamespace).List(syncCtx, everythingSelector) + if err != nil { + fmt.Println("Failed to get pods", err) + return + } + for _, pod := range pods.Items { + fmt.Println("Deleting pod", pod.GetName()) + if err := c.CoreV1().Pods(testNamespace).Delete(context.Background(), pod.GetName(), metav1.DeleteOptions{GracePeriodSeconds: new(int64)}); err != nil { + fmt.Println("Failed to delete pod", pod.GetName(), err) + } + } + svcs, err := c.CoreV1().Services(testNamespace).List(syncCtx, everythingSelector) + if err != nil { + fmt.Println("Failed to get services", err) + return + } + for _, svc := range svcs.Items { + fmt.Println("Deleting svc", svc.GetName()) + err := c.CoreV1().Services(testNamespace).Delete( + context.Background(), svc.GetName(), metav1.DeleteOptions{}) + if err != nil { + fmt.Println("Failed to get service", err) + } + } +} diff --git a/network/benchmarks/netperf/Dockerfile b/network/benchmarks/netperf/nptest/Dockerfile similarity index 87% rename from network/benchmarks/netperf/Dockerfile rename to network/benchmarks/netperf/nptest/Dockerfile index 37777b3bd1..f9b539fdb5 100644 --- a/network/benchmarks/netperf/Dockerfile +++ b/network/benchmarks/netperf/nptest/Dockerfile @@ -21,20 +21,18 @@ # # Args: --mode=worker --host= --port=5202 # -ARG GOLANG_VERSION=1.18 -FROM golang:${GOLANG_VERSION} as builder +FROM golang:bullseye AS builder WORKDIR /workspace -COPY nptest/nptest.go nptest.go -COPY go.sum go.sum -COPY go.mod go.mod +COPY . . RUN go build -o nptests FROM debian:bullseye ENV LD_LIBRARY_PATH=/usr/local/lib -MAINTAINER Girish Kalele +LABEL org.opencontainers.image.description "Network performance tests in k8s engine" + # install binary and remove cache RUN apt-get update \ && apt-get install -y curl wget net-tools gcc make libsctp-dev git autotools-dev automake \ @@ -56,6 +54,12 @@ RUN cd iperf-3.1 && ./configure --prefix=/usr/local --bindir /usr/local/bin && m RUN curl -LO https://github.com/HewlettPackard/netperf/archive/netperf-2.7.0.tar.gz && tar -xzf netperf-2.7.0.tar.gz && mv netperf-netperf-2.7.0/ netperf-2.7.0 RUN cd netperf-2.7.0 && ./configure --prefix=/usr/local --bindir /usr/local/bin && make CFLAGS=-fcommon && make install +# Validate the installation of qperf, iperf3 and netperf +RUN usr/local/bin/qperf --help +RUN usr/local/bin/iperf3 -v +RUN usr/local/bin/netperf -V +RUN usr/local/bin/netserver -V + COPY --from=builder /workspace/nptests /usr/bin/ ENTRYPOINT ["nptests"] diff --git a/network/benchmarks/netperf/nptest/go.mod b/network/benchmarks/netperf/nptest/go.mod new file mode 100644 index 0000000000..eddeeef3ab --- /dev/null +++ b/network/benchmarks/netperf/nptest/go.mod @@ -0,0 +1,3 @@ +module k8s.io/perf-tests/network/nptest + +go 1.23 diff --git a/network/benchmarks/netperf/nptest/nptest.go b/network/benchmarks/netperf/nptest/nptest.go index 464375f416..0022b0c92c 100644 --- a/network/benchmarks/netperf/nptest/nptest.go +++ b/network/benchmarks/netperf/nptest/nptest.go @@ -26,6 +26,7 @@ package main // Imports only base Golang packages import ( "bytes" + "encoding/json" "flag" "fmt" "log" @@ -34,9 +35,7 @@ import ( "net/rpc" "os" "os/exec" - "regexp" "strconv" - "strings" "sync" "time" ) @@ -57,15 +56,24 @@ var testFrom, testTo int var workerStateMap map[string]*workerState -var iperfTCPOutputRegexp *regexp.Regexp -var iperfSCTPOutputRegexp *regexp.Regexp -var iperfUDPOutputRegexp *regexp.Regexp -var netperfOutputRegexp *regexp.Regexp -var iperfCPUOutputRegexp *regexp.Regexp - var dataPoints map[string][]point var dataPointKeys []string var datapointsFlushed bool +var active_tests []*TestCase + +type Result struct { + Label string `json:"label"` + Result json.RawMessage `json:"result"` +} + +var results []Result + +func addResult(label, resultJson string) { + results = append(results, Result{ + Label: label, + Result: json.RawMessage(resultJson), + }) +} var globalLock sync.Mutex @@ -77,6 +85,8 @@ const ( netperfPath = "/usr/local/bin/netperf" netperfServerPath = "/usr/local/bin/netserver" outputCaptureFile = "/tmp/output.txt" + jsonDataMarker = "GENERATING JSON OUTPUT" + jsonEndDataMarker = "END JSON OUTPUT" mssMin = 96 mssMax = 1460 mssStepSize = 64 @@ -84,17 +94,10 @@ const ( msgSizeMin = 1 parallelStreams = "8" rpcServicePort = "5202" + iperf3SctpPort = "5004" localhostIPv4Address = "127.0.0.1" ) -const ( - iperfTCPTest = iota - qperfTCPTest - iperfUDPTest - iperfSctpTest - netperfTest -) - // NetPerfRPC service that exposes RegisterClient and ReceiveOutput for clients type NetPerfRPC int @@ -107,27 +110,26 @@ type ClientRegistrationData struct { } // IperfClientWorkItem represents a single task for an Iperf client -type IperfClientWorkItem struct { - Host string - Port string - MSS int - MsgSize int - Type int +type ClientWorkItem struct { + Host string + Port string + Params TestParams } // IperfServerWorkItem represents a single task for an Iperf server -type IperfServerWorkItem struct { +type ServerWorkItem struct { ListenPort string Timeout int } // WorkItem represents a single task for a worker type WorkItem struct { - IsClientItem bool - IsServerItem bool - IsIdle bool - ClientItem IperfClientWorkItem - ServerItem IperfServerWorkItem + IsClientItem bool + IsServerItem bool + IsIdle bool + TestCaseIndex int + ClientItem ClientWorkItem + ServerItem ServerWorkItem } type workerState struct { @@ -139,70 +141,21 @@ type workerState struct { // WorkerOutput stores the results from a single worker type WorkerOutput struct { - Output string - Code int - Worker string - Type int -} - -type testcase struct { - SourceNode string - DestinationNode string - Label string - ClusterIP bool - Finished bool - MSS int - MsgSize int - Type int + TestCaseIndex int + Output string + Code int + Worker string + Type TestType } -var testcases []*testcase -var currentJobIndex int - func init() { flag.StringVar(&mode, "mode", "worker", "Mode for the daemon (worker | orchestrator)") flag.StringVar(&port, "port", rpcServicePort, "Port to listen on (defaults to 5202)") - flag.StringVar(&host, "host", "", "IP address to bind to (defaults to 0.0.0.0)") flag.IntVar(&testFrom, "testFrom", 0, "start from test number testFrom") flag.IntVar(&testTo, "testTo", 5, "end at test number testTo") workerStateMap = make(map[string]*workerState) - testcases = []*testcase{ - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "1 qperf TCP. Same VM using Pod IP", Type: qperfTCPTest, ClusterIP: false, MsgSize: msgSizeMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "2 qperf TCP. Same VM using Virtual IP", Type: qperfTCPTest, ClusterIP: true, MsgSize: msgSizeMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "3 qperf TCP. Remote VM using Pod IP", Type: qperfTCPTest, ClusterIP: false, MsgSize: msgSizeMin}, - {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "4 qperf TCP. Remote VM using Virtual IP", Type: qperfTCPTest, ClusterIP: true, MsgSize: msgSizeMin}, - {SourceNode: "netperf-w2", DestinationNode: "netperf-w2", Label: "5 qperf TCP. Hairpin Pod to own Virtual IP", Type: qperfTCPTest, ClusterIP: true, MsgSize: msgSizeMin}, - - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "1 iperf TCP. Same VM using Pod IP", Type: iperfTCPTest, ClusterIP: false, MSS: mssMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "2 iperf TCP. Same VM using Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "3 iperf TCP. Remote VM using Pod IP", Type: iperfTCPTest, ClusterIP: false, MSS: mssMin}, - {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "4 iperf TCP. Remote VM using Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin}, - {SourceNode: "netperf-w2", DestinationNode: "netperf-w2", Label: "5 iperf TCP. Hairpin Pod to own Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin}, - - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "6 iperf SCTP. Same VM using Pod IP", Type: iperfSctpTest, ClusterIP: false, MSS: mssMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "7 iperf SCTP. Same VM using Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "8 iperf SCTP. Remote VM using Pod IP", Type: iperfSctpTest, ClusterIP: false, MSS: mssMin}, - {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "9 iperf SCTP. Remote VM using Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin}, - {SourceNode: "netperf-w2", DestinationNode: "netperf-w2", Label: "10 iperf SCTP. Hairpin Pod to own Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "11 iperf UDP. Same VM using Pod IP", Type: iperfUDPTest, ClusterIP: false, MSS: mssMax}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "12 iperf UDP. Same VM using Virtual IP", Type: iperfUDPTest, ClusterIP: true, MSS: mssMax}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "13 iperf UDP. Remote VM using Pod IP", Type: iperfUDPTest, ClusterIP: false, MSS: mssMax}, - {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "14 iperf UDP. Remote VM using Virtual IP", Type: iperfUDPTest, ClusterIP: true, MSS: mssMax}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "15 netperf. Same VM using Pod IP", Type: netperfTest, ClusterIP: false}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "16 netperf. Same VM using Virtual IP", Type: netperfTest, ClusterIP: true}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "17 netperf. Remote VM using Pod IP", Type: netperfTest, ClusterIP: false}, - {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "18 netperf. Remote VM using Virtual IP", Type: netperfTest, ClusterIP: true}, - } - - currentJobIndex = 0 - - // Regexes to parse the Mbits/sec out of iperf TCP, SCTP, UDP and netperf output - iperfTCPOutputRegexp = regexp.MustCompile("SUM.*\\s+(\\d+)\\sMbits/sec\\s+receiver") - iperfSCTPOutputRegexp = regexp.MustCompile("SUM.*\\s+(\\d+)\\sMbits/sec\\s+receiver") - iperfUDPOutputRegexp = regexp.MustCompile("\\s+(\\S+)\\sMbits/sec\\s+\\S+\\s+ms\\s+") - netperfOutputRegexp = regexp.MustCompile("\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\S+\\s+(\\S+)\\s+") - iperfCPUOutputRegexp = regexp.MustCompile(`local/sender\s(\d+\.\d+)%\s\((\d+\.\d+)%\w/(\d+\.\d+)%\w\),\sremote/receiver\s(\d+\.\d+)%\s\((\d+\.\d+)%\w/(\d+\.\d+)%\w\)`) + results = make([]Result, 0) dataPoints = make(map[string][]point) } @@ -222,10 +175,10 @@ func main() { if !validateParams() { fmt.Println("Failed to parse cmdline args - fatal error - bailing out") os.Exit(1) - } grabEnv() - testcases = testcases[testFrom:testTo] + active_tests = make([]*TestCase, 0, testTo-testFrom) + active_tests = append(active_tests, testcases[testFrom:testTo]...) fmt.Println("Running as", mode, "...") if mode == orchestratorMode { orchestrate() @@ -241,8 +194,7 @@ func grabEnv() { podname = os.Getenv("HOSTNAME") } -func validateParams() (rv bool) { - rv = true +func validateParams() bool { if mode != workerMode && mode != orchestratorMode { fmt.Println("Invalid mode", mode) return false @@ -253,14 +205,10 @@ func validateParams() (rv bool) { return false } - if (len(host)) == 0 { - if mode == orchestratorMode { - host = os.Getenv("NETPERF_ORCH_SERVICE_HOST") - } else { - host = os.Getenv("NETPERF_ORCH_SERVICE_HOST") - } + if len(host) == 0 { + host = os.Getenv("NETPERF_ORCH_SERVICE_HOST") } - return + return true } func allWorkersIdle() bool { @@ -272,104 +220,6 @@ func allWorkersIdle() bool { return true } -func getWorkerPodIP(worker string) string { - return workerStateMap[worker].IP -} - -func allocateWorkToClient(workerState *workerState, workItem *WorkItem) { - if !allWorkersIdle() { - workItem.IsIdle = true - return - } - - // System is all idle - pick up next work item to allocate to client - for n, v := range testcases { - if v.Finished { - continue - } - if v.SourceNode != workerState.worker { - workItem.IsIdle = true - return - } - if _, ok := workerStateMap[v.DestinationNode]; !ok { - workItem.IsIdle = true - return - } - fmt.Printf("Requesting jobrun '%s' from %s to %s for MSS %d for MsgSize %d\n", v.Label, v.SourceNode, v.DestinationNode, v.MSS, v.MsgSize) - workItem.ClientItem.Type = v.Type - workItem.IsClientItem = true - workerState.idle = false - currentJobIndex = n - - if !v.ClusterIP { - workItem.ClientItem.Host = getWorkerPodIP(v.DestinationNode) - } else { - workItem.ClientItem.Host = os.Getenv("NETPERF_W2_SERVICE_HOST") - } - - switch { - case v.Type == iperfTCPTest || v.Type == iperfUDPTest || v.Type == iperfSctpTest: - workItem.ClientItem.Port = "5201" - workItem.ClientItem.MSS = v.MSS - - v.MSS = v.MSS + mssStepSize - if v.MSS > mssMax { - v.Finished = true - } - return - case v.Type == qperfTCPTest: - workItem.ClientItem.MsgSize = v.MsgSize - v.MsgSize <<= 1 - if v.MsgSize > msgSizeMax { - v.Finished = true - } - return - case v.Type == netperfTest: - workItem.ClientItem.Port = "12865" - return - } - } - - for _, v := range testcases { - if !v.Finished { - return - } - } - - if !datapointsFlushed { - fmt.Println("ALL TESTCASES AND MSS RANGES COMPLETE - GENERATING CSV OUTPUT") - flushDataPointsToCsv() - datapointsFlushed = true - } - - workItem.IsIdle = true -} - -// RegisterClient registers a single and assign a work item to it -func (t *NetPerfRPC) RegisterClient(data *ClientRegistrationData, workItem *WorkItem) error { - globalLock.Lock() - defer globalLock.Unlock() - - state, ok := workerStateMap[data.Worker] - - if !ok { - // For new clients, trigger an iperf server start immediately - state = &workerState{sentServerItem: true, idle: true, IP: data.IP, worker: data.Worker} - workerStateMap[data.Worker] = state - workItem.IsServerItem = true - workItem.ServerItem.ListenPort = "5201" - workItem.ServerItem.Timeout = 3600 - return nil - } - - // Worker defaults to idle unless the allocateWork routine below assigns an item - state.idle = true - - // Give the worker a new work item or let it idle loop another 5 seconds - allocateWorkToClient(state, workItem) - return nil -} - func writeOutputFile(filename, data string) { fd, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0666) if err != nil { @@ -427,138 +277,16 @@ func flushDataPointsToCsv() { fmt.Println("END CSV DATA") } -func parseIperfTCPBandwidth(output string) string { - // Parses the output of iperf3 and grabs the group Mbits/sec from the output - match := iperfTCPOutputRegexp.FindStringSubmatch(output) - if match != nil && len(match) > 1 { - return match[1] - } - return "0" -} - -func parseQperfTCPLatency(output string) string { - squeeze := func(s string) string { - return strings.Join(strings.Fields(s), " ") - } - - var bw, lat string - lines := strings.Split(output, "\n") - for i, line := range lines { - line = strings.TrimSpace(line) - if line == "tcp_bw:" { - bw = squeeze(lines[i+1]) - } else if line == "tcp_lat:" { - lat = squeeze(lines[i+1]) - } - } - - return fmt.Sprintf("(%s; %s)", bw, lat) -} - -func parseIperfSctpBandwidth(output string) string { - // Parses the output of iperf3 and grabs the group Mbits/sec from the output - match := iperfSCTPOutputRegexp.FindStringSubmatch(output) - if match != nil && len(match) > 1 { - return match[1] - } - return "0" -} - -func parseIperfUDPBandwidth(output string) string { - // Parses the output of iperf3 (UDP mode) and grabs the Mbits/sec from the output - match := iperfUDPOutputRegexp.FindStringSubmatch(output) - if match != nil && len(match) > 1 { - return match[1] - } - return "0" -} - -func parseIperfCPUUsage(output string) (string, string) { - // Parses the output of iperf and grabs the CPU usage on sender and receiver side from the output - match := iperfCPUOutputRegexp.FindStringSubmatch(output) - if match != nil && len(match) > 1 { - return match[1], match[4] - } - return "0", "0" -} - -func parseNetperfBandwidth(output string) string { - // Parses the output of netperf and grabs the Bbits/sec from the output - match := netperfOutputRegexp.FindStringSubmatch(output) - if match != nil && len(match) > 1 { - return match[1] - } - return "0" -} - -// ReceiveOutput processes a data received from a single client -func (t *NetPerfRPC) ReceiveOutput(data *WorkerOutput, _ *int) error { - globalLock.Lock() - defer globalLock.Unlock() - - testcase := testcases[currentJobIndex] - - var outputLog string - var bw string - var cpuSender string - var cpuReceiver string - - switch data.Type { - case iperfTCPTest: - mss := testcases[currentJobIndex].MSS - mssStepSize - outputLog = outputLog + fmt.Sprintln("Received TCP output from worker", data.Worker, "for test", testcase.Label, - "from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output - writeOutputFile(outputCaptureFile, outputLog) - bw = parseIperfTCPBandwidth(data.Output) - cpuSender, cpuReceiver = parseIperfCPUUsage(data.Output) - registerDataPoint(testcase.Label, mss, bw, currentJobIndex) - - case qperfTCPTest: - msgSize := testcases[currentJobIndex].MsgSize / 2 - outputLog = outputLog + fmt.Sprintln("Received TCP output from worker", data.Worker, "for test", testcase.Label, - "from", testcase.SourceNode, "to", testcase.DestinationNode, "MsgSize:", msgSize) + data.Output - writeOutputFile(outputCaptureFile, outputLog) - bw = parseQperfTCPLatency(data.Output) - cpuSender, cpuReceiver = "na", "na" - registerDataPoint(testcase.Label, msgSize, bw, currentJobIndex) - - case iperfSctpTest: - mss := testcases[currentJobIndex].MSS - mssStepSize - outputLog = outputLog + fmt.Sprintln("Received SCTP output from worker", data.Worker, "for test", testcase.Label, - "from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output - writeOutputFile(outputCaptureFile, outputLog) - bw = parseIperfSctpBandwidth(data.Output) - cpuSender, cpuReceiver = parseIperfCPUUsage(data.Output) - registerDataPoint(testcase.Label, mss, bw, currentJobIndex) - - case iperfUDPTest: - mss := testcases[currentJobIndex].MSS - mssStepSize - outputLog = outputLog + fmt.Sprintln("Received UDP output from worker", data.Worker, "for test", testcase.Label, - "from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output - writeOutputFile(outputCaptureFile, outputLog) - bw = parseIperfUDPBandwidth(data.Output) - registerDataPoint(testcase.Label, mss, bw, currentJobIndex) - - case netperfTest: - outputLog = outputLog + fmt.Sprintln("Received netperf output from worker", data.Worker, "for test", testcase.Label, - "from", testcase.SourceNode, "to", testcase.DestinationNode) + data.Output - writeOutputFile(outputCaptureFile, outputLog) - bw = parseNetperfBandwidth(data.Output) - registerDataPoint(testcase.Label, 0, bw, currentJobIndex) - testcases[currentJobIndex].Finished = true - - } - - switch data.Type { - case iperfTCPTest, iperfSctpTest: - fmt.Println("Jobdone from worker", data.Worker, "Bandwidth was", bw, "Mbits/sec. CPU usage sender was", cpuSender, "%. CPU usage receiver was", cpuReceiver, "%.") - case qperfTCPTest: - fmt.Println("Jobdone from worker QPERF", data.Worker, "Bandwidth, Latency was", bw, "CPU usage sender was", cpuSender, "%. CPU usage receiver was", cpuReceiver, "%.") - default: - fmt.Println("Jobdone from worker", data.Worker, "Bandwidth was", bw, "Mbits/sec") +func flushResultJsonData() { + jsonData, err := json.MarshalIndent(results, "", " ") + if err != nil { + fmt.Println("Error generating JSON:", err) + return } - return nil + fmt.Println(jsonDataMarker) + fmt.Println(string(jsonData)) + fmt.Println(jsonEndDataMarker) } func serveRPCRequests(port string) { @@ -610,31 +338,13 @@ func getMyIP() string { } func handleClientWorkItem(client *rpc.Client, workItem *WorkItem) { - fmt.Println("Orchestrator requests worker run item Type:", workItem.ClientItem.Type) - switch { - case workItem.ClientItem.Type == iperfTCPTest || workItem.ClientItem.Type == iperfUDPTest || workItem.ClientItem.Type == iperfSctpTest: - outputString := iperfClient(workItem.ClientItem.Host, workItem.ClientItem.MSS, workItem.ClientItem.Type) - var reply int - err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: workItem.ClientItem.Type}, &reply) - if err != nil { - log.Fatal("failed to call client", err) - } - case workItem.ClientItem.Type == qperfTCPTest: - outputString := qperfClient(workItem.ClientItem.Host, workItem.ClientItem.Type, workItem.ClientItem.MsgSize) - var reply int - err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: workItem.ClientItem.Type}, &reply) - if err != nil { - log.Fatal("failed to call client", err) - } - case workItem.ClientItem.Type == netperfTest: - outputString := netperfClient(workItem.ClientItem.Host) - var reply int - err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: workItem.ClientItem.Type}, &reply) - if err != nil { - log.Fatal("failed to call client", err) - } + testCase := active_tests[workItem.TestCaseIndex] + outputString := testCase.TestRunner(workItem.ClientItem) + var reply int + err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: testCase.Type, TestCaseIndex: workItem.TestCaseIndex}, &reply) + if err != nil { + log.Fatal("failed to call client", err) } - // Client COOLDOWN period before asking for next work item to replenish burst allowance policers etc time.Sleep(10 * time.Second) } @@ -646,29 +356,29 @@ func isIPv6(address string) bool { // startWork : Entry point to the worker infinite loop func startWork() { - for true { - var timeout time.Duration + for { var client *rpc.Client var err error + // Address recieved via command line address := host if isIPv6(address) { address = "[" + address + "]" } - timeout = 5 - for true { + for { fmt.Println("Attempting to connect to orchestrator at", host) client, err = rpc.DialHTTP("tcp", address+":"+port) if err == nil { break } fmt.Println("RPC connection to ", host, " failed:", err) - time.Sleep(timeout * time.Second) + time.Sleep(5 * time.Second) } - for true { + for { clientData := ClientRegistrationData{Host: podname, KubeNode: kubenode, Worker: worker, IP: getMyIP()} + var workItem WorkItem if err := client.Call("NetPerfRPC.RegisterClient", clientData, &workItem); err != nil { @@ -678,18 +388,18 @@ func startWork() { } switch { - case workItem.IsIdle == true: + case workItem.IsIdle: time.Sleep(5 * time.Second) continue - case workItem.IsServerItem == true: + case workItem.IsServerItem: fmt.Println("Orchestrator requests worker run iperf and netperf servers") go iperfServer() go qperfServer() go netperfServer() time.Sleep(1 * time.Second) - case workItem.IsClientItem == true: + case workItem.IsClientItem: handleClientWorkItem(client, &workItem) } } @@ -698,10 +408,8 @@ func startWork() { // Invoke and indefinitely run an iperf server func iperfServer() { - output, success := cmdExec(iperf3Path, []string{iperf3Path, "-s", host, "-J", "-i", "60"}, 15) - if success { - fmt.Println(output) - } + output, _ := cmdExec(iperf3Path, []string{iperf3Path, "-s", host, "-J", "-i", "60", "-D"}, 15) + fmt.Println(output) } // Invoke and indefinitely run an qperf server @@ -720,77 +428,136 @@ func netperfServer() { } } -// Invoke and run an iperf client and return the output if successful. -func iperfClient(serverHost string, mss int, workItemType int) (rv string) { - switch { - case workItemType == iperfTCPTest: - output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-V", "-N", "-i", "30", "-t", "10", "-f", "m", "-w", "512M", "-Z", "-P", parallelStreams, "-M", strconv.Itoa(mss)}, 15) - if success { - rv = output - } - - case workItemType == iperfSctpTest: - output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-V", "-N", "-i", "30", "-t", "10", "-f", "m", "-w", "512M", "-Z", "-P", parallelStreams, "-M", strconv.Itoa(mss), "--sctp"}, 15) - if success { - rv = output - } +func cmdExec(command string, args []string, _ int32) (rv string, rc bool) { + cmd := exec.Cmd{Path: command, Args: args} - case workItemType == iperfUDPTest: - output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-i", "30", "-t", "10", "-f", "m", "-b", "0", "-u"}, 15) - if success { - rv = output - } + var stdoutput bytes.Buffer + var stderror bytes.Buffer + cmd.Stdout = &stdoutput + cmd.Stderr = &stderror + if err := cmd.Run(); err != nil { + outputstr := stdoutput.String() + errstr := stderror.String() + fmt.Println("Failed to run", outputstr, "error:", errstr, err) + return } + + rv = stdoutput.String() + rc = true return } -// Invoke and run an qperf client and return the output if successful. -func qperfClient(serverHost string, workItemType, msgSize int) (rv string) { +func (t *NetPerfRPC) ReceiveOutput(data *WorkerOutput, _ *int) error { + globalLock.Lock() + defer globalLock.Unlock() - str := fmt.Sprint + fmt.Println("ReceiveOutput WorkItem TestCaseIndex: ", data.TestCaseIndex) + testcase := active_tests[data.TestCaseIndex] - switch { - case workItemType == qperfTCPTest: - output, success := cmdExec(qperfPath, []string{ - qperfPath, "-ip", "19766", "-m", str(msgSize), serverHost, "tcp_bw", "tcp_lat", - }, 15) - if success { - rv = output - } - default: - fmt.Println("unknown work item type: ", workItemType) + outputLog := fmt.Sprintln("Received output from worker", data.Worker, "for test", testcase.Label, + "from", testcase.SourceNode, "to", testcase.DestinationNode) + data.Output + writeOutputFile(outputCaptureFile, outputLog) + + if testcase.BandwidthParser != nil { + bw, mss := testcase.BandwidthParser(data.Output) + registerDataPoint(testcase.Label, mss, fmt.Sprintf("%f", bw), data.TestCaseIndex) + fmt.Println("Jobdone from worker", data.Worker, "Bandwidth was", bw, "Mbits/sec") } - return + + if testcase.JsonParser != nil { + addResult( + fmt.Sprintf("%s with MSS: %d", testcase.Label, testcase.MSS-mssStepSize), + testcase.JsonParser(data.Output), + ) + fmt.Println("Jobdone from worker", data.Worker, "JSON output generated") + } + + return nil } -// Invoke and run a netperf client and return the output if successful. -func netperfClient(serverHost string) (rv string) { - output, success := cmdExec(netperfPath, []string{netperfPath, "-H", serverHost}, 15) - if success { - fmt.Println(output) - rv = output - } else { - fmt.Println("Error running netperf client", output) +func (t *NetPerfRPC) RegisterClient(data ClientRegistrationData, workItem *WorkItem) error { + globalLock.Lock() + defer globalLock.Unlock() + + state, ok := workerStateMap[data.Worker] + + if !ok { + // For new clients, trigger an iperf server start immediately + state = &workerState{sentServerItem: true, idle: true, IP: data.IP, worker: data.Worker} + workerStateMap[data.Worker] = state + workItem.IsServerItem = true + workItem.ServerItem.ListenPort = "5201" + workItem.ServerItem.Timeout = 3600 + return nil } - return + // Worker defaults to idle unless the allocateWork routine below assigns an item + state.idle = true + + // Give the worker a new work item or let it idle loop another 5 seconds + allocateWorkToClient(state, workItem) + return nil } -func cmdExec(command string, args []string, _ int32) (rv string, rc bool) { - cmd := exec.Cmd{Path: command, Args: args} +func allocateWorkToClient(workerState *workerState, workItem *WorkItem) { + if !allWorkersIdle() { + workItem.IsIdle = true + return + } + + // System is all idle - pick up next work item to allocate to client + for n, v := range active_tests { + if v.Finished { + continue + } + if v.SourceNode != workerState.worker { + workItem.IsIdle = true + return + } + if _, ok := workerStateMap[v.DestinationNode]; !ok { + workItem.IsIdle = true + return + } + fmt.Printf("Requesting jobrun '%s' from %s to %s for MSS %d for MsgSize %d\n", v.Label, v.SourceNode, v.DestinationNode, v.MSS, v.MsgSize) + workItem.IsClientItem = true + workItem.TestCaseIndex = n + workerState.idle = false + + if !v.ClusterIP { + workItem.ClientItem.Host = workerStateMap[workerState.worker].IP + } else { + workItem.ClientItem.Host = os.Getenv("NETPERF_W2_SERVICE_HOST") + } + + workItem.ClientItem.Params = v.TestParams + + if v.MSS != 0 && v.MSS < mssMax { + v.MSS += mssStepSize + } else { + v.Finished = true + } + + if v.Type == netperfTest { + workItem.ClientItem.Port = "12865" + } else { + workItem.ClientItem.Port = "5201" + } - var stdoutput bytes.Buffer - var stderror bytes.Buffer - cmd.Stdout = &stdoutput - cmd.Stderr = &stderror - if err := cmd.Run(); err != nil { - outputstr := stdoutput.String() - errstr := stderror.String() - fmt.Println("Failed to run", outputstr, "error:", errstr, err) return } - rv = stdoutput.String() - rc = true - return + for _, v := range active_tests { + if !v.Finished { + return + } + } + + if !datapointsFlushed { + fmt.Println("ALL TESTCASES AND MSS RANGES COMPLETE - GENERATING CSV OUTPUT") + flushDataPointsToCsv() + flushResultJsonData() + datapointsFlushed = true + } + + workItem.IsIdle = true } diff --git a/network/benchmarks/netperf/nptest/nptest_test.go b/network/benchmarks/netperf/nptest/nptest_test.go deleted file mode 100644 index c651feb814..0000000000 --- a/network/benchmarks/netperf/nptest/nptest_test.go +++ /dev/null @@ -1,36 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import "testing" - -func TestParseQperfTCPLatency(t *testing.T) { - input := ` -tcp_bw: - bw = 5.07 GB/sec -tcp_lat: - latency = 15.6 us -` - - expected := "(bw = 5.07 GB/sec; latency = 15.6 us)" - output := parseQperfTCPLatency(input) - - if output != expected { - t.Fatalf("Expected: %s, Got: %s", expected, output) - } - -} diff --git a/network/benchmarks/netperf/nptest/parsers/bandwidth_parsers.go b/network/benchmarks/netperf/nptest/parsers/bandwidth_parsers.go new file mode 100644 index 0000000000..81cf321923 --- /dev/null +++ b/network/benchmarks/netperf/nptest/parsers/bandwidth_parsers.go @@ -0,0 +1,43 @@ +package parsers + +import ( + "encoding/json" + "regexp" + "strconv" +) + +func ParseIperfTCPBandwidth(output string) (bw float64, mss int) { + var iperfTcpoutput IperfTcpCommandOutput + + err := json.Unmarshal([]byte(output), &iperfTcpoutput) + if err != nil { + return 0, 0 + } + + bw = iperfTcpoutput.End.SumSent.BitsPerSecond / 1e6 + mss = iperfTcpoutput.Start.TcpMss + + return bw, mss +} + +func ParseIperfUDPBandwidth(output string) (bw float64, mss int) { + var iperfUdpOutput IperfUdpCommandOutput + + err := json.Unmarshal([]byte(output), &iperfUdpOutput) + if err != nil { + return 0, 0 + } + + return iperfUdpOutput.End.Sum.BitsPerSecond / 1e6, 0 +} + +func ParseNetperfBandwidth(output string) (bw float64, mss int) { + // Parses the output of netperf and grabs the Bbits/sec from the output + netperfOutputRegexp := regexp.MustCompile("\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\S+\\s+(\\S+)\\s+") + match := netperfOutputRegexp.FindStringSubmatch(output) + if len(match) > 1 { + floatVal, _ := strconv.ParseFloat(match[1], 64) + return floatVal, 0 + } + return 0, 0 +} diff --git a/network/benchmarks/netperf/nptest/parsers/json_parsers.go b/network/benchmarks/netperf/nptest/parsers/json_parsers.go new file mode 100644 index 0000000000..632cc9fa82 --- /dev/null +++ b/network/benchmarks/netperf/nptest/parsers/json_parsers.go @@ -0,0 +1,85 @@ +package parsers + +import ( + "encoding/json" + "math/bits" +) + +func ParseIperfTcpResults(output string) string { + var iperfOutput IperfTcpCommandOutput + + err := json.Unmarshal([]byte(output), &iperfOutput) + if err != nil { + return "{\"error\": \"Failed to parse JSON output\", \"message\": \"" + err.Error() + "\"}" + } + + // Calculate the min, max and mean rtts by aggregating the streams + var sumMeanRtt uint + var minRtt uint = 1<