diff --git a/README.md b/README.md index 80e45e9..cf5327c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # jbpf-protobuf +**NOTE: This project uses an experimental feature from jbpf. It is not meant to be used in production environments.** + This repository is a extension for [jbpf](https://github.com/microsoft/jbpf/) demonstrating how to utilize protobuf serialization as part of jbpf. Prerequisites: @@ -7,7 +9,8 @@ Prerequisites: * Go v1.23.2+ * Make * Pip -* Python +* Python3 +* Protocol Buffer Compiler (protoc) The project utilizes [Nanopb](https://github.com/nanopb/nanopb) to generate C structures for given protobuf specs that use contiguous memory. It also generates serializer libraries that can be provided to jbpf, to encode output and decode input data to seamlessly integrate external data processing systems. @@ -17,6 +20,9 @@ The project utilizes [Nanopb](https://github.com/nanopb/nanopb) to generate C st # init submodules: ./init_submodules.sh +# Install nanopb pip packages: +python3 -m pip install -r 3p/nanopb/requirements.txt + # source environment variables source ./setup_jbpfp_env.sh @@ -34,7 +40,7 @@ docker build -t jbpf_protobuf_builder:latest -f deploy/Dockerfile . ## Running the examples -In order to run any of the samples, you'll need to build Janus. +In order to run any of the samples, you'll need to build jbpf. ```sh mkdir -p jbpf/build diff --git a/docs/design.md b/docs/design.md new file mode 100644 index 0000000..3fdef67 --- /dev/null +++ b/docs/design.md @@ -0,0 +1,94 @@ +# High level Architecture + +`jbpf_protobuf_cli` provides tooling to generate serialization assets for `jbpf` using protobuf. + +For complete details of each subcommand, see `./jbpf_protobuf_cli {SUBCOMMAND} --help`. + +![architecture](./jbpf_arch.png) + +## Serde + +The `serde` subcommand generates assets from protobuf specs which can integrate with `jbpf`'s [serde functionality](https://github.com/microsoft/jbpf/blob/main/docs/serde.md). + +Developers must write `.proto` file(s) defining the models that are to be serialized. Additionally they must provide [generator options](https://jpa.kapsi.fi/nanopb/docs/reference.html#generator-options) as defined by nanopb to ensure generated structs can be defined in C as contiguous memory structs. + + +### Simple example + +This example goes through generating serde assets for a simple protobuf schema. + +``` +// schema.proto +syntax = "proto2"; + +message my_struct { + required int32 value = 1; + required string name = 2; +} + +// schema.options +my_struct.name max_size:32 +``` + +```sh +# To see all flags and options available, see +./jbpf_protobuf_cli serde --help + +# Generate the jbpf serde assets for the above proto spec +./jbpf_protobuf_cli serde -s schema:my_struct +``` + +This will generate the following files: +* `schema:my_struct_serializer.c`: + ```c + #define PB_FIELD_32BIT 1 + #include + #include + #include + #include "schema.pb.h" + + const uint32_t proto_message_size = sizeof(my_struct); + + int jbpf_io_serialize(void* input_msg_buf, size_t input_msg_buf_size, char* serialized_data_buf, size_t serialized_data_buf_size) { + if (input_msg_buf_size != proto_message_size) + return -1; + + pb_ostream_t ostream = pb_ostream_from_buffer((uint8_t*)serialized_data_buf, serialized_data_buf_size); + if (!pb_encode(&ostream, my_struct_fields, input_msg_buf)) + return -1; + + return ostream.bytes_written; + } + + int jbpf_io_deserialize(char* serialized_data_buf, size_t serialized_data_buf_size, void* output_msg_buf, size_t output_msg_buf_size) { + if (output_msg_buf_size != proto_message_size) + return 0; + + pb_istream_t istream = pb_istream_from_buffer((uint8_t*)serialized_data_buf, serialized_data_buf_size); + return pb_decode(&istream, my_struct_fields, output_msg_buf); + } + ``` +* `schema:my_struct_serializer.so` is the compiled shared object library of `schema:my_struct_serializer.c`. +* `schema.pb` is the complied protobuf spec. +* `schema.pb.c` is the generated nanopb constant definitions. +* `schema.pb.h` is the generated nanopb headers file. + +When loading the codelet description you can provide the generated `{schema}:{message_name}_serializer.so` as the io_channel `serde.file_path`. + +Additionally, you can provide the `{schema}.pb` to a decoder to be able to dynamically decode/encode the protobuf messages. + +To see detailed usage, run `jbpf_protobuf_cli serde --help`. + +## Decoder + +The cli tool also provides a `decoder` subcommand which can be run locally to receive and print protobuf messages sent over a UDP channel. The examples [example_collect_control](../examples/first_example_ipc/example_collect_control.cpp) and [first_example_standalone](../examples/first_example_standalone/example_app.cpp) bind to a UDP socket on port 20788 to send output data from jbpf which matches the default UDP socket for the decoder. + +This is useful for debugging output from jbpf and provide an example of how someone might dynamically decode output from jbpf by providing `.pb` schemas along with the associated stream identifier. + +To see detailed usage, run `jbpf_protobuf_cli decoder --help`. + +## Input Forwarder + +The tool also provides the ability to dynamically send protobuf input to jbpf from an external entity. It uses a TCP socket to send input channel messages to a jbpf instance. The examples [example_collect_control](../examples/first_example_ipc/example_collect_control.cpp) and [first_example_standalone](../examples/first_example_standalone/example_app.cpp) bind to a TCP socket on port 20787 to receive input data for jbpf which matches the default TCP socket for the input forwarder. + +To see detailed usage, run `jbpf_protobuf_cli input forward --help`. diff --git a/docs/jbpf_arch.png b/docs/jbpf_arch.png new file mode 100755 index 0000000..345f44e Binary files /dev/null and b/docs/jbpf_arch.png differ diff --git a/examples/first_example_ipc/README.md b/examples/first_example_ipc/README.md index 82b4f56..2e243e1 100644 --- a/examples/first_example_ipc/README.md +++ b/examples/first_example_ipc/README.md @@ -95,7 +95,7 @@ INFO[0010] {"seqNo":7, "value":-7, "name":"instance 7"} streamUUID=00112233-445 To send a manual control message to the `example_app`, we run the command: ```sh -$ ./send_control.sh 101 +$ ./send_input_msg.sh 101 ``` This should trigger a message in the `example_app`: diff --git a/examples/first_example_ipc/load.sh b/examples/first_example_ipc/load.sh index b16b217..0269ff0 100755 --- a/examples/first_example_ipc/load.sh +++ b/examples/first_example_ipc/load.sh @@ -2,6 +2,6 @@ set -e -$JBPFP_PATH/pkg/jbpf_protobuf_cli decoder load -c codeletset_load_request.yaml --decoder-control-ip 0.0.0.0 +$JBPFP_PATH/pkg/jbpf_protobuf_cli decoder load -c codeletset_load_request.yaml $JBPF_PATH/out/bin/jbpf_lcm_cli -l -c codeletset_load_request.yaml diff --git a/examples/first_example_ipc/run_decoder.sh b/examples/first_example_ipc/run_decoder.sh index 48ccb33..0458541 100755 --- a/examples/first_example_ipc/run_decoder.sh +++ b/examples/first_example_ipc/run_decoder.sh @@ -1,3 +1,3 @@ #!/bin/sh -$JBPFP_PATH/pkg/jbpf_protobuf_cli decoder run --jbpf-enable +$JBPFP_PATH/pkg/jbpf_protobuf_cli decoder run diff --git a/examples/first_example_ipc/send_control.sh b/examples/first_example_ipc/send_input_msg.sh similarity index 53% rename from examples/first_example_ipc/send_control.sh rename to examples/first_example_ipc/send_input_msg.sh index 05cff25..b00c4df 100755 --- a/examples/first_example_ipc/send_control.sh +++ b/examples/first_example_ipc/send_input_msg.sh @@ -1,5 +1,6 @@ #!/bin/sh -$JBPFP_PATH/pkg/jbpf_protobuf_cli decoder control \ +$JBPFP_PATH/pkg/jbpf_protobuf_cli input forward \ + -c codeletset_load_request.yaml \ --stream-id 11111111-1111-1111-1111-111111111111 \ --inline-json "{\"value\": $1}" diff --git a/examples/first_example_standalone/README.md b/examples/first_example_standalone/README.md index dbab1b5..a72011f 100644 --- a/examples/first_example_standalone/README.md +++ b/examples/first_example_standalone/README.md @@ -69,7 +69,7 @@ INFO[0010] {"seqNo":7, "value":-7, "name":"instance 7"} streamUUID=00112233-445 To send a manual control message to the `example_app`, we run the command: ```sh -$ ./send_control.sh 101 +$ ./send_input_msg.sh 101 ``` This should trigger a message in the `example_app`: diff --git a/examples/first_example_standalone/load.sh b/examples/first_example_standalone/load.sh index b16b217..0269ff0 100755 --- a/examples/first_example_standalone/load.sh +++ b/examples/first_example_standalone/load.sh @@ -2,6 +2,6 @@ set -e -$JBPFP_PATH/pkg/jbpf_protobuf_cli decoder load -c codeletset_load_request.yaml --decoder-control-ip 0.0.0.0 +$JBPFP_PATH/pkg/jbpf_protobuf_cli decoder load -c codeletset_load_request.yaml $JBPF_PATH/out/bin/jbpf_lcm_cli -l -c codeletset_load_request.yaml diff --git a/examples/first_example_standalone/run_decoder.sh b/examples/first_example_standalone/run_decoder.sh index 48ccb33..0458541 100755 --- a/examples/first_example_standalone/run_decoder.sh +++ b/examples/first_example_standalone/run_decoder.sh @@ -1,3 +1,3 @@ #!/bin/sh -$JBPFP_PATH/pkg/jbpf_protobuf_cli decoder run --jbpf-enable +$JBPFP_PATH/pkg/jbpf_protobuf_cli decoder run diff --git a/examples/first_example_standalone/send_control.sh b/examples/first_example_standalone/send_input_msg.sh similarity index 53% rename from examples/first_example_standalone/send_control.sh rename to examples/first_example_standalone/send_input_msg.sh index 05cff25..b00c4df 100755 --- a/examples/first_example_standalone/send_control.sh +++ b/examples/first_example_standalone/send_input_msg.sh @@ -1,5 +1,6 @@ #!/bin/sh -$JBPFP_PATH/pkg/jbpf_protobuf_cli decoder control \ +$JBPFP_PATH/pkg/jbpf_protobuf_cli input forward \ + -c codeletset_load_request.yaml \ --stream-id 11111111-1111-1111-1111-111111111111 \ --inline-json "{\"value\": $1}" diff --git a/jbpf b/jbpf index 5709617..d821e42 160000 --- a/jbpf +++ b/jbpf @@ -1 +1 @@ -Subproject commit 5709617993d3fd719f62f12893c6cfa50556509f +Subproject commit d821e42e56884ebdf5d66520ed2df30f136a90af diff --git a/pkg/cmd/decoder/control/control.go b/pkg/cmd/decoder/control/control.go deleted file mode 100644 index dd813e3..0000000 --- a/pkg/cmd/decoder/control/control.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. - -package control - -import ( - "encoding/json" - "errors" - "fmt" - "jbpf_protobuf_cli/common" - "jbpf_protobuf_cli/schema" - "os" - - "github.com/google/uuid" - "github.com/spf13/cobra" - "github.com/spf13/pflag" -) - -type runOptions struct { - schema *schema.ClientOptions - general *common.GeneralOptions - - filePath string - inlineJSON string - payload string - streamID string - streamUUID uuid.UUID -} - -func addToFlags(flags *pflag.FlagSet, opts *runOptions) { - flags.StringVarP(&opts.filePath, "file", "f", "", "path to file containing payload in JSON format") - flags.StringVarP(&opts.inlineJSON, "inline-json", "j", "", "inline payload in JSON format") - flags.StringVar(&opts.streamID, "stream-id", "00000000-0000-0000-0000-000000000000", "stream ID") -} - -func (o *runOptions) parse() error { - if (len(o.inlineJSON) > 0 && len(o.filePath) > 0) || (len(o.inlineJSON) == 0 && len(o.filePath) == 0) { - return errors.New("exactly one of --file or --inline-json can be specified") - } - - if len(o.filePath) != 0 { - if fi, err := os.Stat(o.filePath); err != nil { - return err - } else if fi.IsDir() { - return fmt.Errorf(`expected "%s" to be a file, got a directory`, o.filePath) - } - payload, err := os.ReadFile(o.filePath) - if err != nil { - return err - } - var deserializedPayload interface{} - err = json.Unmarshal(payload, &deserializedPayload) - if err != nil { - return err - } - o.payload = string(payload) - } else { - var deserializedPayload interface{} - err := json.Unmarshal([]byte(o.inlineJSON), &deserializedPayload) - if err != nil { - return err - } - o.payload = o.inlineJSON - } - - var err error - o.streamUUID, err = uuid.Parse(o.streamID) - if err != nil { - return err - } - - return nil -} - -// Command Load a schema to a local decoder -func Command(opts *common.GeneralOptions) *cobra.Command { - runOptions := &runOptions{ - schema: &schema.ClientOptions{}, - general: opts, - } - cmd := &cobra.Command{ - Use: "control", - Short: "Load a control message via a local decoder", - Long: "Load a control message via a local decoder", - RunE: func(cmd *cobra.Command, _ []string) error { - return run(cmd, runOptions) - }, - SilenceUsage: true, - } - addToFlags(cmd.PersistentFlags(), runOptions) - schema.AddClientOptionsToFlags(cmd.PersistentFlags(), runOptions.schema) - return cmd -} - -func run(cmd *cobra.Command, opts *runOptions) error { - if err := errors.Join( - opts.general.Parse(), - opts.schema.Parse(), - opts.parse(), - ); err != nil { - return err - } - - logger := opts.general.Logger - - client, err := schema.NewClient(cmd.Context(), logger, opts.schema) - if err != nil { - return err - } - - return client.SendControl(opts.streamUUID, string(opts.payload)) -} diff --git a/pkg/cmd/decoder/decoder.go b/pkg/cmd/decoder/decoder.go index 6a2a1e7..26ea3a3 100644 --- a/pkg/cmd/decoder/decoder.go +++ b/pkg/cmd/decoder/decoder.go @@ -3,7 +3,6 @@ package decoder import ( - "jbpf_protobuf_cli/cmd/decoder/control" "jbpf_protobuf_cli/cmd/decoder/load" "jbpf_protobuf_cli/cmd/decoder/run" "jbpf_protobuf_cli/cmd/decoder/unload" @@ -20,7 +19,6 @@ func Command(opts *common.GeneralOptions) *cobra.Command { Short: "Execute a decoder subcommand", } cmd.AddCommand( - control.Command(opts), load.Command(opts), unload.Command(opts), run.Command(opts), diff --git a/pkg/cmd/decoder/load/load.go b/pkg/cmd/decoder/load/load.go index d284d27..03f88d5 100644 --- a/pkg/cmd/decoder/load/load.go +++ b/pkg/cmd/decoder/load/load.go @@ -13,34 +13,32 @@ import ( ) type runOptions struct { - schema *schema.ClientOptions - general *common.GeneralOptions + decoderAPI *schema.Options + general *common.GeneralOptions compiledProtos map[string]*common.File configFiles []string - configs []DecoderLoadConfig + configs []*common.CodeletsetConfig } func addToFlags(flags *pflag.FlagSet, opts *runOptions) { flags.StringArrayVarP(&opts.configFiles, "config", "c", []string{}, "configuration files to load") } -func (o *runOptions) parse() error { - configs, compiledProtos, err := fromFiles(o.configFiles...) +func (o *runOptions) parse() (err error) { + o.configs, err = common.CodeletsetConfigFromFiles(o.configFiles...) if err != nil { - return err + return } - o.configs = configs - o.compiledProtos = compiledProtos - - return nil + o.compiledProtos, err = common.LoadCompiledProtos(o.configs, false, true) + return } // Command Load a schema to a local decoder func Command(opts *common.GeneralOptions) *cobra.Command { runOptions := &runOptions{ - schema: &schema.ClientOptions{}, - general: opts, + decoderAPI: &schema.Options{}, + general: opts, } cmd := &cobra.Command{ Use: "load", @@ -52,14 +50,14 @@ func Command(opts *common.GeneralOptions) *cobra.Command { SilenceUsage: true, } addToFlags(cmd.PersistentFlags(), runOptions) - schema.AddClientOptionsToFlags(cmd.PersistentFlags(), runOptions.schema) + schema.AddOptionsToFlags(cmd.PersistentFlags(), runOptions.decoderAPI) return cmd } func run(cmd *cobra.Command, opts *runOptions) error { if err := errors.Join( opts.general.Parse(), - opts.schema.Parse(), + opts.decoderAPI.Parse(), opts.parse(), ); err != nil { return err @@ -67,7 +65,7 @@ func run(cmd *cobra.Command, opts *runOptions) error { logger := opts.general.Logger - client, err := schema.NewClient(cmd.Context(), logger, opts.schema) + client, err := schema.NewClient(cmd.Context(), logger, opts.decoderAPI) if err != nil { return err } @@ -76,28 +74,18 @@ func run(cmd *cobra.Command, opts *runOptions) error { for _, config := range opts.configs { for _, desc := range config.CodeletDescriptor { - for _, io := range desc.InIOChannel { - if existing, ok := schemas[io.Serde.Protobuf.protoPackageName]; ok { - existing.Streams[io.streamUUID] = io.Serde.Protobuf.MsgName - } else { - compiledProto := opts.compiledProtos[io.Serde.Protobuf.absPackagePath] - schemas[io.Serde.Protobuf.protoPackageName] = &schema.LoadRequest{ - CompiledProto: compiledProto.Data, - Streams: map[uuid.UUID]string{ - io.streamUUID: io.Serde.Protobuf.MsgName, - }, - } - } - } for _, io := range desc.OutIOChannel { - if existing, ok := schemas[io.Serde.Protobuf.protoPackageName]; ok { - existing.Streams[io.streamUUID] = io.Serde.Protobuf.MsgName + if existing, ok := schemas[io.Serde.Protobuf.PackageName]; ok { + existing.Streams[io.StreamUUID] = io.Serde.Protobuf.MsgName } else { - compiledProto := opts.compiledProtos[io.Serde.Protobuf.absPackagePath] - schemas[io.Serde.Protobuf.protoPackageName] = &schema.LoadRequest{ + compiledProto, ok := opts.compiledProtos[io.Serde.Protobuf.PackagePath] + if !ok { + return errors.New("compiled proto not found") + } + schemas[io.Serde.Protobuf.PackageName] = &schema.LoadRequest{ CompiledProto: compiledProto.Data, Streams: map[uuid.UUID]string{ - io.streamUUID: io.Serde.Protobuf.MsgName, + io.StreamUUID: io.Serde.Protobuf.MsgName, }, } } diff --git a/pkg/cmd/decoder/load/load_config.go b/pkg/cmd/decoder/load/load_config.go deleted file mode 100644 index a2fcdec..0000000 --- a/pkg/cmd/decoder/load/load_config.go +++ /dev/null @@ -1,113 +0,0 @@ -package load - -import ( - "errors" - "fmt" - "jbpf_protobuf_cli/common" - "os" - "path/filepath" - "strings" - - "github.com/google/uuid" - "gopkg.in/yaml.v3" -) - -// ProtobufConfig represents the configuration for a protobuf message -type ProtobufConfig struct { - MsgName string `yaml:"msg_name"` - PackagePath string `yaml:"package_path"` - - absPackagePath string - protoPackageName string -} - -// SerdeConfig represents the configuration for serialize/deserialize -type SerdeConfig struct { - Protobuf *ProtobufConfig `yaml:"protobuf"` -} - -// IOChannelConfig represents the configuration for an IO channel -type IOChannelConfig struct { - Serde *SerdeConfig `yaml:"serde"` - StreamID string `yaml:"stream_id"` - - streamUUID uuid.UUID -} - -// CodeletDescriptorConfig represents the configuration for a codelet descriptor -type CodeletDescriptorConfig struct { - InIOChannel []*IOChannelConfig `yaml:"in_io_channel"` - OutIOChannel []*IOChannelConfig `yaml:"out_io_channel"` -} - -// DecoderLoadConfig represents the configuration for loading a decoder -type DecoderLoadConfig struct { - CodeletDescriptor []*CodeletDescriptorConfig `yaml:"codelet_descriptor"` -} - -func (io *IOChannelConfig) verify(compiledProtos map[string]*common.File) error { - streamUUID, err := uuid.Parse(io.StreamID) - if err != nil { - return err - } - io.streamUUID = streamUUID - if io.Serde == nil || io.Serde.Protobuf == nil || io.Serde.Protobuf.PackagePath == "" { - return fmt.Errorf("missing required field package_path") - } - - io.Serde.Protobuf.absPackagePath = os.ExpandEnv(io.Serde.Protobuf.PackagePath) - basename := filepath.Base(io.Serde.Protobuf.absPackagePath) - io.Serde.Protobuf.protoPackageName = strings.TrimSuffix(basename, filepath.Ext(basename)) - - if _, ok := compiledProtos[io.Serde.Protobuf.absPackagePath]; !ok { - protoPkg, err := common.NewFile(io.Serde.Protobuf.absPackagePath) - if err != nil { - return err - } - compiledProtos[io.Serde.Protobuf.absPackagePath] = protoPkg - } - - return nil -} - -func fromFiles(configs ...string) ([]DecoderLoadConfig, map[string]*common.File, error) { - out := make([]DecoderLoadConfig, 0, len(configs)) - compiledProtos := make(map[string]*common.File) - errs := make([]error, 0, len(configs)) - -configLoad: - for _, c := range configs { - f, err := common.NewFile(c) - if err != nil { - errs = append(errs, fmt.Errorf("failed to read file %s: %w", c, err)) - continue - } - var config DecoderLoadConfig - if err := yaml.Unmarshal(f.Data, &config); err != nil { - errs = append(errs, fmt.Errorf("failed to unmarshal file %s: %w", c, err)) - continue - } - - for _, desc := range config.CodeletDescriptor { - for _, io := range desc.InIOChannel { - if err := io.verify(compiledProtos); err != nil { - errs = append(errs, fmt.Errorf("failed to verify in_io_channel in file %s: %w", c, err)) - continue configLoad - } - } - for _, io := range desc.OutIOChannel { - if err := io.verify(compiledProtos); err != nil { - errs = append(errs, fmt.Errorf("failed to verify out_io_channel in file %s: %w", c, err)) - continue configLoad - } - } - } - - out = append(out, config) - } - if err := errors.Join(errs...); err != nil { - return nil, nil, err - } - - return out, compiledProtos, nil -} diff --git a/pkg/cmd/decoder/run/run.go b/pkg/cmd/decoder/run/run.go index df508e5..1e96e0e 100644 --- a/pkg/cmd/decoder/run/run.go +++ b/pkg/cmd/decoder/run/run.go @@ -15,17 +15,17 @@ import ( ) type runOptions struct { - general *common.GeneralOptions - data *data.ServerOptions - schema *schema.ServerOptions + general *common.GeneralOptions + data *data.ServerOptions + decoderAPI *schema.Options } // Command Run decoder to collect, decode and print jbpf output func Command(opts *common.GeneralOptions) *cobra.Command { runOptions := &runOptions{ - general: opts, - data: &data.ServerOptions{}, - schema: &schema.ServerOptions{}, + general: opts, + data: &data.ServerOptions{}, + decoderAPI: &schema.Options{}, } cmd := &cobra.Command{ Use: "run", @@ -36,7 +36,7 @@ func Command(opts *common.GeneralOptions) *cobra.Command { }, SilenceUsage: true, } - schema.AddServerOptionsToFlags(cmd.PersistentFlags(), runOptions.schema) + schema.AddOptionsToFlags(cmd.PersistentFlags(), runOptions.decoderAPI) data.AddServerOptionsToFlags(cmd.PersistentFlags(), runOptions.data) return cmd } @@ -45,7 +45,7 @@ func run(cmd *cobra.Command, opts *runOptions) error { if err := errors.Join( opts.general.Parse(), opts.data.Parse(), - opts.schema.Parse(), + opts.decoderAPI.Parse(), ); err != nil { return err } @@ -54,10 +54,7 @@ func run(cmd *cobra.Command, opts *runOptions) error { store := schema.NewStore() - schemaServer, err := schema.NewServer(cmd.Context(), logger, opts.schema, store) - if err != nil { - return err - } + schemaServer := schema.NewServer(cmd.Context(), logger, opts.decoderAPI, store) dataServer, err := data.NewServer(cmd.Context(), logger, opts.data, store) if err != nil { diff --git a/pkg/cmd/decoder/unload/unload.go b/pkg/cmd/decoder/unload/unload.go index dfa4487..6bf82cf 100644 --- a/pkg/cmd/decoder/unload/unload.go +++ b/pkg/cmd/decoder/unload/unload.go @@ -17,32 +17,27 @@ const ( ) type runOptions struct { - schema *schema.ClientOptions - general *common.GeneralOptions + decoderAPI *schema.Options + general *common.GeneralOptions configFiles []string - configs []DecoderUnloadConfig + configs []*common.CodeletsetConfig } func addToFlags(flags *pflag.FlagSet, opts *runOptions) { flags.StringArrayVarP(&opts.configFiles, "config", "c", []string{}, "configuration files to unload") } -func (o *runOptions) parse() error { - configs, err := fromFiles(o.configFiles...) - if err != nil { - return err - } - o.configs = configs - - return nil +func (o *runOptions) parse() (err error) { + o.configs, err = common.CodeletsetConfigFromFiles(o.configFiles...) + return } // Command Unload a schema from a local decoder func Command(opts *common.GeneralOptions) *cobra.Command { runOptions := &runOptions{ - schema: &schema.ClientOptions{}, - general: opts, + decoderAPI: &schema.Options{}, + general: opts, } cmd := &cobra.Command{ Use: "unload", @@ -54,14 +49,14 @@ func Command(opts *common.GeneralOptions) *cobra.Command { SilenceUsage: true, } addToFlags(cmd.PersistentFlags(), runOptions) - schema.AddClientOptionsToFlags(cmd.PersistentFlags(), runOptions.schema) + schema.AddOptionsToFlags(cmd.PersistentFlags(), runOptions.decoderAPI) return cmd } func run(cmd *cobra.Command, opts *runOptions) error { if err := errors.Join( opts.general.Parse(), - opts.schema.Parse(), + opts.decoderAPI.Parse(), opts.parse(), ); err != nil { return err @@ -69,7 +64,7 @@ func run(cmd *cobra.Command, opts *runOptions) error { logger := opts.general.Logger - client, err := schema.NewClient(cmd.Context(), logger, opts.schema) + client, err := schema.NewClient(cmd.Context(), logger, opts.decoderAPI) if err != nil { return err } @@ -78,11 +73,8 @@ func run(cmd *cobra.Command, opts *runOptions) error { for _, config := range opts.configs { for _, desc := range config.CodeletDescriptor { - for _, io := range desc.InIOChannel { - streamUUIDs = append(streamUUIDs, io.streamUUID) - } for _, io := range desc.OutIOChannel { - streamUUIDs = append(streamUUIDs, io.streamUUID) + streamUUIDs = append(streamUUIDs, io.StreamUUID) } } } diff --git a/pkg/cmd/decoder/unload/unload_config.go b/pkg/cmd/decoder/unload/unload_config.go deleted file mode 100644 index 886e50b..0000000 --- a/pkg/cmd/decoder/unload/unload_config.go +++ /dev/null @@ -1,78 +0,0 @@ -package unload - -import ( - "errors" - "fmt" - "jbpf_protobuf_cli/common" - - "github.com/google/uuid" - "gopkg.in/yaml.v3" -) - -// IOChannelConfig represents the configuration for an IO channel -type IOChannelConfig struct { - StreamID string `yaml:"stream_id"` - - streamUUID uuid.UUID -} - -// CodeletDescriptorConfig represents the configuration for a codelet descriptor -type CodeletDescriptorConfig struct { - InIOChannel []*IOChannelConfig `yaml:"in_io_channel"` - OutIOChannel []*IOChannelConfig `yaml:"out_io_channel"` -} - -// DecoderUnloadConfig represents the configuration for unloading a decoder -type DecoderUnloadConfig struct { - CodeletDescriptor []*CodeletDescriptorConfig `yaml:"codelet_descriptor"` -} - -func (io *IOChannelConfig) verify() error { - streamUUID, err := uuid.Parse(io.StreamID) - if err != nil { - return err - } - io.streamUUID = streamUUID - return nil -} - -func fromFiles(configs ...string) ([]DecoderUnloadConfig, error) { - out := make([]DecoderUnloadConfig, 0, len(configs)) - errs := make([]error, 0, len(configs)) - -configLoad: - for _, c := range configs { - f, err := common.NewFile(c) - if err != nil { - errs = append(errs, fmt.Errorf("failed to read file %s: %w", c, err)) - continue - } - var config DecoderUnloadConfig - if err := yaml.Unmarshal(f.Data, &config); err != nil { - errs = append(errs, fmt.Errorf("failed to unmarshal file %s: %w", c, err)) - continue - } - - for _, desc := range config.CodeletDescriptor { - for _, io := range desc.InIOChannel { - if err := io.verify(); err != nil { - errs = append(errs, fmt.Errorf("failed to verify in_io_channel in file %s: %w", c, err)) - continue configLoad - } - } - for _, io := range desc.OutIOChannel { - if err := io.verify(); err != nil { - errs = append(errs, fmt.Errorf("failed to verify out_io_channel in file %s: %w", c, err)) - continue configLoad - } - } - } - - out = append(out, config) - } - if err := errors.Join(errs...); err != nil { - return nil, err - } - - return out, nil -} diff --git a/pkg/cmd/input/forward/forward.go b/pkg/cmd/input/forward/forward.go new file mode 100644 index 0000000..8b08e1d --- /dev/null +++ b/pkg/cmd/input/forward/forward.go @@ -0,0 +1,194 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. + +package forward + +import ( + "encoding/json" + "errors" + "fmt" + "jbpf_protobuf_cli/common" + "jbpf_protobuf_cli/jbpf" + "os" + + "github.com/google/uuid" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/descriptorpb" + "google.golang.org/protobuf/types/dynamicpb" +) + +type runOptions struct { + jbpf *jbpf.Options + general *common.GeneralOptions + + compiledProtos map[string]*common.File + configFiles []string + configs []*common.CodeletsetConfig + filePath string + inlineJSON string + payload string + streamID string + streamUUID uuid.UUID +} + +func addToFlags(flags *pflag.FlagSet, opts *runOptions) { + flags.StringArrayVarP(&opts.configFiles, "config", "c", []string{}, "configuration files to load") + flags.StringVar(&opts.streamID, "stream-id", "00000000-0000-0000-0000-000000000000", "stream ID") + flags.StringVarP(&opts.filePath, "file", "f", "", "path to file containing payload in JSON format") + flags.StringVarP(&opts.inlineJSON, "inline-json", "j", "", "inline payload in JSON format") +} + +func (o *runOptions) parse() (err error) { + o.payload, err = loadInlineJSONOrFromFile(o.inlineJSON, o.filePath) + if err != nil { + return + } + + o.streamUUID, err = uuid.Parse(o.streamID) + if err != nil { + return + } + + o.configs, err = common.CodeletsetConfigFromFiles(o.configFiles...) + if err != nil { + return + } + + o.compiledProtos, err = common.LoadCompiledProtos(o.configs, true, false) + return +} + +// Command Load a schema to a local decoder +func Command(opts *common.GeneralOptions) *cobra.Command { + runOptions := &runOptions{ + jbpf: &jbpf.Options{}, + general: opts, + } + cmd := &cobra.Command{ + Use: "forward", + Short: "Load a control message", + Long: "Load a control message", + RunE: func(cmd *cobra.Command, _ []string) error { + return run(cmd, runOptions) + }, + SilenceUsage: true, + } + addToFlags(cmd.PersistentFlags(), runOptions) + jbpf.AddOptionsToFlags(cmd.PersistentFlags(), runOptions.jbpf) + return cmd +} + +func run(_ *cobra.Command, opts *runOptions) error { + if err := errors.Join( + opts.general.Parse(), + opts.jbpf.Parse(), + opts.parse(), + ); err != nil { + return err + } + + logger := opts.general.Logger + + client, err := jbpf.NewClient(logger, opts.jbpf) + if err != nil { + return err + } + + msg, err := getMessageInstance(opts.configs, opts.compiledProtos, opts.streamUUID) + if err != nil { + return err + } + + err = protojson.Unmarshal([]byte(opts.payload), msg) + if err != nil { + logger.WithError(err).Error("error unmarshalling payload") + return err + } + + logger.WithFields(logrus.Fields{ + "msg": fmt.Sprintf("%T - \"%v\"", msg, msg), + }).Info("sending msg") + + payload, err := proto.Marshal(msg) + if err != nil { + return err + } + + out := append(opts.streamUUID[:], payload...) + + return client.Write(out) +} + +func loadInlineJSONOrFromFile(inlineJSON, filePath string) (string, error) { + if (len(inlineJSON) > 0 && len(filePath) > 0) || (len(inlineJSON) == 0 && len(filePath) == 0) { + return "", errors.New("exactly one of --file or --inline-json can be specified") + } + + if len(filePath) != 0 { + if fi, err := os.Stat(filePath); err != nil { + return "", err + } else if fi.IsDir() { + return "", fmt.Errorf(`expected "%s" to be a file, got a directory`, filePath) + } + payload, err := os.ReadFile(filePath) + if err != nil { + return "", err + } + var deserializedPayload interface{} + err = json.Unmarshal(payload, &deserializedPayload) + if err != nil { + return "", err + } + return string(payload), nil + } + + var deserializedPayload interface{} + err := json.Unmarshal([]byte(inlineJSON), &deserializedPayload) + if err != nil { + return "", err + } + return inlineJSON, nil +} + +func getMessageInstance(configs []*common.CodeletsetConfig, compiledProtos map[string]*common.File, streamUUID uuid.UUID) (*dynamicpb.Message, error) { + for _, config := range configs { + for _, desc := range config.CodeletDescriptor { + for _, io := range desc.InIOChannel { + if io.StreamUUID == streamUUID { + compiledProto := compiledProtos[io.Serde.Protobuf.PackagePath] + + fds := &descriptorpb.FileDescriptorSet{} + if err := proto.Unmarshal(compiledProto.Data, fds); err != nil { + return nil, err + } + + pd, err := protodesc.NewFiles(fds) + if err != nil { + return nil, err + } + + msgName := protoreflect.FullName(io.Serde.Protobuf.MsgName) + var desc protoreflect.Descriptor + desc, err = pd.FindDescriptorByName(msgName) + if err != nil { + return nil, err + } + + md, ok := desc.(protoreflect.MessageDescriptor) + if !ok { + return nil, fmt.Errorf("failed to cast desc to protoreflect.MessageDescriptor, got %T", desc) + } + + return dynamicpb.NewMessage(md), nil + } + } + } + } + + return nil, fmt.Errorf("stream %s not found in any of the loaded schemas", streamUUID) +} diff --git a/pkg/cmd/input/input.go b/pkg/cmd/input/input.go new file mode 100644 index 0000000..1c3aa42 --- /dev/null +++ b/pkg/cmd/input/input.go @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. + +package input + +import ( + "jbpf_protobuf_cli/cmd/input/forward" + "jbpf_protobuf_cli/common" + + "github.com/spf13/cobra" +) + +// Command returns the input commands +func Command(opts *common.GeneralOptions) *cobra.Command { + cmd := &cobra.Command{ + Use: "input", + Long: "Execute a jbpf input subcommand.", + Short: "Execute a jbpf input subcommand", + } + cmd.AddCommand( + forward.Command(opts), + ) + return cmd +} diff --git a/pkg/common/codeletset_config.go b/pkg/common/codeletset_config.go new file mode 100644 index 0000000..fd0be50 --- /dev/null +++ b/pkg/common/codeletset_config.go @@ -0,0 +1,188 @@ +package common + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/google/uuid" +) + +// ProtobufConfig represents the configuration for a protobuf message +type ProtobufConfig struct { + MsgName string + PackageName string + PackagePath string +} + +func newProtobufConfig(cfg *ProtobufRawConfig) (*ProtobufConfig, error) { + if len(cfg.MsgName) == 0 { + return nil, fmt.Errorf("missing required field serde.protobuf.msg_name") + } + if len(cfg.PackagePath) == 0 { + return nil, fmt.Errorf("missing required field serde.protobuf.package_path") + } + + packagePath := os.ExpandEnv(cfg.PackagePath) + basename := filepath.Base(packagePath) + + return &ProtobufConfig{ + MsgName: cfg.MsgName, + PackageName: strings.TrimSuffix(basename, filepath.Ext(basename)), + PackagePath: packagePath, + }, nil +} + +// SerdeConfig represents the configuration for serialize/deserialize +type SerdeConfig struct { + Protobuf *ProtobufConfig +} + +func newSerdeConfig(cfg *SerdeRawConfig) (*SerdeConfig, error) { + if cfg.Protobuf == nil { + return nil, fmt.Errorf("missing required field serde.protobuf") + } + + protobuf, err := newProtobufConfig(cfg.Protobuf) + if err != nil { + return nil, err + } + + return &SerdeConfig{Protobuf: protobuf}, nil +} + +// IOChannelConfig represents the configuration for an IO channel +type IOChannelConfig struct { + Serde *SerdeConfig + StreamUUID uuid.UUID +} + +func newIOChannelConfig(cfg *IOChannelRawConfig) (*IOChannelConfig, error) { + if cfg.Serde == nil { + return nil, fmt.Errorf("missing required field serde") + } + + serde, err := newSerdeConfig(cfg.Serde) + if err != nil { + return nil, err + } + + streamUUID, err := uuid.Parse(cfg.StreamID) + if err != nil { + return nil, err + } + + return &IOChannelConfig{ + Serde: serde, + StreamUUID: streamUUID, + }, nil +} + +// CodeletDescriptorConfig represents the configuration for a codelet descriptor +type CodeletDescriptorConfig struct { + InIOChannel []*IOChannelConfig + OutIOChannel []*IOChannelConfig +} + +func newCodeletDescriptorConfig(cfg *CodeletDescriptorRawConfig) (*CodeletDescriptorConfig, error) { + inIOChannel := make([]*IOChannelConfig, 0, len(cfg.InIOChannel)) + for _, rawIO := range cfg.InIOChannel { + io, err := newIOChannelConfig(rawIO) + if err != nil { + return nil, err + } + inIOChannel = append(inIOChannel, io) + } + + outIOChannel := make([]*IOChannelConfig, 0, len(cfg.OutIOChannel)) + for _, rawIO := range cfg.OutIOChannel { + io, err := newIOChannelConfig(rawIO) + if err != nil { + return nil, err + } + outIOChannel = append(outIOChannel, io) + } + + return &CodeletDescriptorConfig{ + InIOChannel: inIOChannel, + OutIOChannel: outIOChannel, + }, nil +} + +// CodeletsetConfig represents the configuration for loading a decoder +type CodeletsetConfig struct { + CodeletDescriptor []*CodeletDescriptorConfig +} + +func newCodeletSetConfig(cfg *CodeletsetRawConfig) (*CodeletsetConfig, error) { + codeletDescriptors := make([]*CodeletDescriptorConfig, 0, len(cfg.CodeletDescriptor)) + for _, rawDesc := range cfg.CodeletDescriptor { + desc, err := newCodeletDescriptorConfig(rawDesc) + if err != nil { + return nil, err + } + codeletDescriptors = append(codeletDescriptors, desc) + } + return &CodeletsetConfig{CodeletDescriptor: codeletDescriptors}, nil +} + +// LoadCompiledProtos loads the compiled protobuf files from the codeletset config +func LoadCompiledProtos(cfgs []*CodeletsetConfig, includeInIO, includeOutIO bool) (map[string]*File, error) { + compiledProtos := make(map[string]*File) + + for _, c := range cfgs { + for _, desc := range c.CodeletDescriptor { + if includeInIO { + for _, io := range desc.InIOChannel { + if _, ok := compiledProtos[io.Serde.Protobuf.PackagePath]; !ok { + protoPkg, err := NewFile(io.Serde.Protobuf.PackagePath) + if err != nil { + return nil, err + } + compiledProtos[io.Serde.Protobuf.PackagePath] = protoPkg + } + } + } + + if includeOutIO { + for _, io := range desc.OutIOChannel { + if _, ok := compiledProtos[io.Serde.Protobuf.PackagePath]; !ok { + protoPkg, err := NewFile(io.Serde.Protobuf.PackagePath) + if err != nil { + return nil, err + } + compiledProtos[io.Serde.Protobuf.PackagePath] = protoPkg + } + } + } + } + } + + return compiledProtos, nil +} + +// CodeletsetConfigFromFiles reads and unmarshals the given files into a slice of CodeletsetConfig +func CodeletsetConfigFromFiles(configs ...string) ([]*CodeletsetConfig, error) { + out := make([]*CodeletsetConfig, 0, len(configs)) + errs := make([]error, 0, len(configs)) + + for _, c := range configs { + rawConfig, err := newCodeletsetRawConfig(c) + if err != nil { + errs = append(errs, err) + continue + } + + config, err := newCodeletSetConfig(rawConfig) + if err != nil { + errs = append(errs, fmt.Errorf("failed to unpack file %s: %w", c, err)) + continue + } + + out = append(out, config) + } + + return out, errors.Join(errs...) +} diff --git a/pkg/common/codeletset_raw_config.go b/pkg/common/codeletset_raw_config.go new file mode 100644 index 0000000..8a13823 --- /dev/null +++ b/pkg/common/codeletset_raw_config.go @@ -0,0 +1,49 @@ +package common + +import ( + "fmt" + + "gopkg.in/yaml.v3" +) + +// ProtobufRawConfig represents the configuration for a protobuf message as defined in the yaml config +type ProtobufRawConfig struct { + MsgName string `yaml:"msg_name"` + PackagePath string `yaml:"package_path"` +} + +// SerdeRawConfig represents the configuration for serialize/deserialize as defined in the yaml config +type SerdeRawConfig struct { + Protobuf *ProtobufRawConfig `yaml:"protobuf"` +} + +// IOChannelRawConfig represents the configuration for an IO channel as defined in the yaml config +type IOChannelRawConfig struct { + Serde *SerdeRawConfig `yaml:"serde"` + StreamID string `yaml:"stream_id"` +} + +// CodeletDescriptorRawConfig represents the configuration for a codelet descriptor as defined in the yaml config +type CodeletDescriptorRawConfig struct { + InIOChannel []*IOChannelRawConfig `yaml:"in_io_channel"` + OutIOChannel []*IOChannelRawConfig `yaml:"out_io_channel"` +} + +// CodeletsetRawConfig represents the configuration for loading a decoder as defined in the yaml config +type CodeletsetRawConfig struct { + CodeletDescriptor []*CodeletDescriptorRawConfig `yaml:"codelet_descriptor"` +} + +func newCodeletsetRawConfig(filePath string) (*CodeletsetRawConfig, error) { + f, err := NewFile(filePath) + if err != nil { + return nil, fmt.Errorf("failed to read file %s: %w", filePath, err) + } + + var rawConfig CodeletsetRawConfig + if err := yaml.Unmarshal(f.Data, &rawConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal file %s: %w", filePath, err) + } + + return &rawConfig, nil +} diff --git a/pkg/common/options.go b/pkg/common/options.go index e4b3c3d..2968e41 100644 --- a/pkg/common/options.go +++ b/pkg/common/options.go @@ -4,6 +4,10 @@ package common import ( "errors" + "fmt" + "io" + "os" + "strings" "github.com/sirupsen/logrus" "github.com/spf13/pflag" @@ -19,6 +23,8 @@ func NewGeneralOptions(flags *pflag.FlagSet) *GeneralOptions { // NewGeneralOptionsFromLogger creates a new GeneralOptions from a logger func NewGeneralOptionsFromLogger(logger *logrus.Logger) *GeneralOptions { opts := &GeneralOptions{ + file: "", + formatter: "TextFormatter", Logger: logger, logLevel: logger.Level.String(), reportCaller: logger.ReportCaller, @@ -28,6 +34,8 @@ func NewGeneralOptionsFromLogger(logger *logrus.Logger) *GeneralOptions { // GeneralOptions contains the general options for the jbpf cli type GeneralOptions struct { + file string + formatter string logLevel string reportCaller bool @@ -36,6 +44,8 @@ type GeneralOptions struct { func (opts *GeneralOptions) addToFlags(flags *pflag.FlagSet) { flags.BoolVar(&opts.reportCaller, "log-report-caller", false, "show report caller in logs") + flags.StringVar(&opts.file, "log-file", "", "if set, will write logs to file as well as terminal") + flags.StringVar(&opts.formatter, "log-formatter", "TextFormatter", "logger formatter, set to UncoloredTextFormatter, JSONFormatter or TextFormatter") flags.StringVar(&opts.logLevel, "log-level", "info", "log level, set to: panic, fatal, error, warn, info, debug or trace") } @@ -48,13 +58,39 @@ func (opts *GeneralOptions) Parse() error { // GetLogger returns a logger based on the options func (opts *GeneralOptions) getLogger() (*logrus.Logger, error) { - logger := logrus.New() logLev, err := logrus.ParseLevel(opts.logLevel) if err != nil { - return logger, err + return nil, err } - logger.SetReportCaller(opts.reportCaller) - logger.SetLevel(logLev) - return logger, nil + var formatter logrus.Formatter + switch strings.ToLower(opts.formatter) { + case "uncoloredtextformatter": + formatter = new(UncoloredTextFormatter) + case "jsonformatter": + formatter = new(logrus.JSONFormatter) + case "textformatter": + formatter = new(logrus.TextFormatter) + default: + return nil, fmt.Errorf("invalid log formatter: %v", opts.formatter) + } + + var out io.Writer = os.Stderr + + if opts.file != "" { + file, err := os.OpenFile(opts.file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + out = io.MultiWriter(os.Stderr, file) + } + + return &logrus.Logger{ + Out: out, + Formatter: formatter, + Hooks: make(logrus.LevelHooks), + Level: logLev, + ExitFunc: os.Exit, + ReportCaller: opts.reportCaller, + }, nil } diff --git a/pkg/common/uncolored_text_formatter.go b/pkg/common/uncolored_text_formatter.go new file mode 100644 index 0000000..9dbf8b9 --- /dev/null +++ b/pkg/common/uncolored_text_formatter.go @@ -0,0 +1,266 @@ +package common + +import ( + "bytes" + "fmt" + "runtime" + "sort" + "strconv" + "strings" + "time" + "unicode/utf8" + + "github.com/sirupsen/logrus" +) + +const ( + defaultTimestampFormat = time.RFC3339 +) + +var ( + baseTimestamp time.Time + levelTextMaxLength int +) + +func init() { + baseTimestamp = time.Now() + + for _, level := range logrus.AllLevels { + levelTextLength := utf8.RuneCount([]byte(level.String())) + if levelTextLength > levelTextMaxLength { + levelTextMaxLength = levelTextLength + } + } +} + +type fieldKey string + +// FieldMap allows customization of the key names for default fields. +type FieldMap map[fieldKey]string + +func (f FieldMap) resolve(key fieldKey) string { + if k, ok := f[key]; ok { + return k + } + + return string(key) +} + +// UncoloredTextFormatter formats logs into text +type UncoloredTextFormatter struct { + // Force quoting of all values + ForceQuote bool + + // DisableQuote disables quoting for all values. + // DisableQuote will have a lower priority than ForceQuote. + // If both of them are set to true, quote will be forced on all values. + DisableQuote bool + + // Override coloring based on CLICOLOR and CLICOLOR_FORCE. - https://bixense.com/clicolors/ + EnvironmentOverrideColors bool + + // Disable timestamp logging. useful when output is redirected to logging + // system that already adds timestamps. + DisableTimestamp bool + + // Enable logging the full timestamp when a TTY is attached instead of just + // the time passed since beginning of execution. + FullTimestamp bool + + // TimestampFormat to use for display when a full timestamp is printed. + // The format to use is the same than for time.Format or time.Parse from the standard + // library. + // The standard Library already provides a set of predefined format. + TimestampFormat string + + // The fields are sorted by default for a consistent output. For applications + // that log extremely frequently and don't use the JSON formatter this may not + // be desired. + DisableSorting bool + + // The keys sorting function, when uninitialized it uses sort.Strings. + SortingFunc func([]string) + + // Disables the truncation of the level text to 4 characters. + DisableLevelTruncation bool + + // PadLevelText Adds padding the level text so that all the levels output at the same length + // PadLevelText is a superset of the DisableLevelTruncation option + PadLevelText bool + + // QuoteEmptyFields will wrap empty fields in quotes if true + QuoteEmptyFields bool + + // FieldMap allows users to customize the names of keys for default fields. + // As an example: + // formatter := &UncoloredTextFormatter{ + // FieldMap: FieldMap{ + // FieldKeyTime: "@timestamp", + // FieldKeyLevel: "@level", + // FieldKeyMsg: "@message"}} + FieldMap FieldMap + + // CallerPrettyfier can be set by the user to modify the content + // of the function and file keys in the data when ReportCaller is + // activated. If any of the returned value is the empty string the + // corresponding key will be removed from fields. + CallerPrettyfier func(*runtime.Frame) (function string, file string) +} + +// Format renders a single log entry +func (f *UncoloredTextFormatter) Format(entry *logrus.Entry) ([]byte, error) { + data := make(logrus.Fields) + for k, v := range entry.Data { + data[k] = v + } + prefixFieldClashes(data, f.FieldMap, entry.HasCaller()) + keys := make([]string, 0, len(data)) + for k := range data { + keys = append(keys, k) + } + + if !f.DisableSorting { + if f.SortingFunc == nil { + sort.Strings(keys) + } + } + + var b *bytes.Buffer + if entry.Buffer != nil { + b = entry.Buffer + } else { + b = &bytes.Buffer{} + } + + timestampFormat := f.TimestampFormat + if timestampFormat == "" { + timestampFormat = defaultTimestampFormat + } + f.printToBuf(b, entry, keys, data, timestampFormat) + b.WriteByte('\n') + return b.Bytes(), nil +} + +func (f *UncoloredTextFormatter) printToBuf(b *bytes.Buffer, entry *logrus.Entry, keys []string, data logrus.Fields, timestampFormat string) { + levelText := strings.ToUpper(entry.Level.String()) + if !f.DisableLevelTruncation && !f.PadLevelText { + levelText = levelText[0:4] + } + if f.PadLevelText { + // Generates the format string used in the next line, for example "%-6s" or "%-7s". + // Based on the max level text length. + formatString := "%-" + strconv.Itoa(levelTextMaxLength) + "s" + // Formats the level text by appending spaces up to the max length, for example: + // - "INFO " + // - "WARNING" + levelText = fmt.Sprintf(formatString, levelText) + } + + // Remove a single newline if it already exists in the message to keep + // the behavior of logrus text_formatter the same as the stdlib log package + entry.Message = strings.TrimSuffix(entry.Message, "\n") + + caller := "" + if entry.HasCaller() { + funcVal := fmt.Sprintf("%s()", entry.Caller.Function) + fileVal := fmt.Sprintf("%s:%d", entry.Caller.File, entry.Caller.Line) + + if f.CallerPrettyfier != nil { + funcVal, fileVal = f.CallerPrettyfier(entry.Caller) + } + + if fileVal == "" { + caller = funcVal + } else if funcVal == "" { + caller = fileVal + } else { + caller = fileVal + " " + funcVal + } + } + + switch { + case f.DisableTimestamp: + fmt.Fprintf(b, "%s%s %-44s ", levelText, caller, entry.Message) + case !f.FullTimestamp: + fmt.Fprintf(b, "%s[%04d]%s %-44s ", levelText, int(entry.Time.Sub(baseTimestamp)/time.Second), caller, entry.Message) + default: + fmt.Fprintf(b, "%s[%s]%s %-44s ", levelText, entry.Time.Format(timestampFormat), caller, entry.Message) + } + for _, k := range keys { + v := data[k] + fmt.Fprintf(b, " %s=", k) + f.appendValue(b, v) + } +} + +func (f *UncoloredTextFormatter) needsQuoting(text string) bool { + if f.ForceQuote { + return true + } + if f.QuoteEmptyFields && len(text) == 0 { + return true + } + if f.DisableQuote { + return false + } + for _, ch := range text { + if !((ch >= 'a' && ch <= 'z') || + (ch >= 'A' && ch <= 'Z') || + (ch >= '0' && ch <= '9') || + ch == '-' || ch == '.' || ch == '_' || ch == '/' || ch == '@' || ch == '^' || ch == '+') { + return true + } + } + return false +} + +func (f *UncoloredTextFormatter) appendValue(b *bytes.Buffer, value interface{}) { + stringVal, ok := value.(string) + if !ok { + stringVal = fmt.Sprint(value) + } + + if !f.needsQuoting(stringVal) { + b.WriteString(stringVal) + } else { + b.WriteString(fmt.Sprintf("%q", stringVal)) + } +} + +func prefixFieldClashes(data logrus.Fields, fieldMap FieldMap, reportCaller bool) { + timeKey := fieldMap.resolve(logrus.FieldKeyTime) + if t, ok := data[timeKey]; ok { + data["fields."+timeKey] = t + delete(data, timeKey) + } + + msgKey := fieldMap.resolve(logrus.FieldKeyMsg) + if m, ok := data[msgKey]; ok { + data["fields."+msgKey] = m + delete(data, msgKey) + } + + levelKey := fieldMap.resolve(logrus.FieldKeyLevel) + if l, ok := data[levelKey]; ok { + data["fields."+levelKey] = l + delete(data, levelKey) + } + + logrusErrKey := fieldMap.resolve(logrus.FieldKeyLogrusError) + if l, ok := data[logrusErrKey]; ok { + data["fields."+logrusErrKey] = l + delete(data, logrusErrKey) + } + + // If reportCaller is not set, 'func' will not conflict. + if reportCaller { + funcKey := fieldMap.resolve(logrus.FieldKeyFunc) + if l, ok := data[funcKey]; ok { + data["fields."+funcKey] = l + } + fileKey := fieldMap.resolve(logrus.FieldKeyFile) + if l, ok := data[fileKey]; ok { + data["fields."+fileKey] = l + } + } +} diff --git a/pkg/jbpf/options.go b/pkg/jbpf/options.go index 31efcd9..5a981f9 100644 --- a/pkg/jbpf/options.go +++ b/pkg/jbpf/options.go @@ -19,7 +19,6 @@ const ( // Options is the options for the jbpf client type Options struct { - Enable bool ip string keepAlivePeriod time.Duration port uint16 @@ -31,7 +30,6 @@ func AddOptionsToFlags(flags *pflag.FlagSet, opts *Options) { return } - flags.BoolVar(&opts.Enable, "jbpf-enable", false, "whether to allow sending control messages to the jbpf TCP server") flags.DurationVar(&opts.keepAlivePeriod, optionsPrefix+"-keep-alive", 0, "time to keep alive the connection") flags.StringVar(&opts.ip, optionsPrefix+"-ip", defaultIP, "IP address of the jbpf TCP server") flags.Uint16Var(&opts.port, optionsPrefix+"-port", defaultPort, "port address of the jbpf TCP server") @@ -39,9 +37,6 @@ func AddOptionsToFlags(flags *pflag.FlagSet, opts *Options) { // Parse parses the options func (o *Options) Parse() error { - if !o.Enable { - return nil - } _, err := url.ParseRequestURI(fmt.Sprintf("%s://%s:%d", scheme, o.ip, o.port)) if err != nil { return err diff --git a/pkg/main.go b/pkg/main.go index 9d3bc55..1314376 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -3,6 +3,7 @@ package main import ( "context" "jbpf_protobuf_cli/cmd/decoder" + "jbpf_protobuf_cli/cmd/input" "jbpf_protobuf_cli/cmd/serde" "jbpf_protobuf_cli/common" "os" @@ -21,11 +22,12 @@ func main() { func cli() *cobra.Command { cmd := &cobra.Command{ Use: os.Args[0], - Long: "jbpf companion command line tool to generate protobuf assets and a local decoder to interact with a remote jbpf instance over sockets.", + Long: "jbpf companion command line tool to generate protobuf assets. Includes a decoder to receive output data over a UDP socket from a jbpf instance. Messages are then decoded and print as json. Also provides a mechanism to dispatch input control messages to a jbpf instance via a TCP socket.", } opts := common.NewGeneralOptions(cmd.PersistentFlags()) cmd.AddCommand( decoder.Command(opts), + input.Command(opts), serde.Command(opts), ) return cmd diff --git a/pkg/schema/client.go b/pkg/schema/client.go index b760d3a..be87c72 100644 --- a/pkg/schema/client.go +++ b/pkg/schema/client.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "net/http" + "net/url" "strings" "github.com/google/uuid" @@ -26,9 +27,14 @@ type Client struct { } // NewClient creates a new Client -func NewClient(ctx context.Context, logger *logrus.Logger, opts *ClientOptions) (*Client, error) { +func NewClient(ctx context.Context, logger *logrus.Logger, opts *Options) (*Client, error) { + ip := opts.ip + if len(ip) == 0 { + ip = "localhost" + } + return &Client{ - baseURL: fmt.Sprintf("%s://%s:%d", controlScheme, opts.control.ip, opts.control.port), + baseURL: fmt.Sprintf("%s://%s:%d", controlScheme, ip, opts.port), ctx: ctx, inner: &http.Client{}, logger: logger, @@ -88,7 +94,7 @@ func (c *Client) doDelete(relativePath string) error { return err } - if resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices { + if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) c.logger.WithField("body", buf.String()).WithError(err).Error("unexpected status code") return err @@ -155,7 +161,7 @@ func (c *Client) Unload(streamUUIDs []uuid.UUID) error { errs := make([]error, 0, len(streamUUIDs)) for _, streamUUID := range streamUUIDs { streamIDStr := base64.StdEncoding.EncodeToString(streamUUID[:]) - if err := c.doDelete(fmt.Sprintf("/stream?stream_uuid=%s", streamIDStr)); err != nil { + if err := c.doDelete(fmt.Sprintf("/stream?stream_uuid=%s", url.PathEscape(streamIDStr))); err != nil { err = fmt.Errorf("failed to delete stream ID association %s: %w", streamUUID.String(), err) errs = append(errs, err) continue diff --git a/pkg/schema/client_options.go b/pkg/schema/client_options.go deleted file mode 100644 index 5310ed5..0000000 --- a/pkg/schema/client_options.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. - -package schema - -import "github.com/spf13/pflag" - -// ClientOptions is the options for the decoder client -type ClientOptions struct { - control *controlOptions -} - -// AddClientOptionsToFlags adds the client options to the flags -func AddClientOptionsToFlags(flags *pflag.FlagSet, opts *ClientOptions) { - if opts.control == nil { - opts.control = &controlOptions{} - } - - addControlOptionsToFlags(flags, opts.control) -} - -// Parse parses the client options -func (o *ClientOptions) Parse() error { - return o.control.parse() -} diff --git a/pkg/schema/options.go b/pkg/schema/options.go index 75ef1f6..bbef271 100644 --- a/pkg/schema/options.go +++ b/pkg/schema/options.go @@ -13,22 +13,25 @@ const ( // DefaultControlPort is the default used for the local decoder server DefaultControlPort = uint16(20789) - controlPrefix = "decoder-control" + controlPrefix = "decoder-api" controlScheme = "http" defaultControlIP = "" ) -type controlOptions struct { +// Options for internal communication with the decoder +type Options struct { ip string port uint16 } -func addControlOptionsToFlags(flags *pflag.FlagSet, opts *controlOptions) { - flags.StringVar(&opts.ip, controlPrefix+"-ip", defaultControlIP, "IP address of the control HTTP server") - flags.Uint16Var(&opts.port, controlPrefix+"-port", DefaultControlPort, "port address of the control HTTP server") +// AddOptionsToFlags adds the options to the provided flag set +func AddOptionsToFlags(flags *pflag.FlagSet, opts *Options) { + flags.StringVar(&opts.ip, controlPrefix+"-ip", defaultControlIP, "IP address of the decoder HTTP server") + flags.Uint16Var(&opts.port, controlPrefix+"-port", DefaultControlPort, "port address of the decoder HTTP server") } -func (o *controlOptions) parse() error { +// Parse the options +func (o *Options) Parse() error { _, err := url.ParseRequestURI(fmt.Sprintf("%s://%s:%d", controlScheme, o.ip, o.port)) if err != nil { return err diff --git a/pkg/schema/serve.go b/pkg/schema/serve.go index 77f1fde..4747aba 100644 --- a/pkg/schema/serve.go +++ b/pkg/schema/serve.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "net/http" + "net/url" "os" "os/signal" "strings" @@ -64,53 +65,39 @@ func (s *Server) serveHTTP(ctx context.Context) error { } case http.MethodDelete: - streamUUIDStr := r.URL.Query().Get("streamUUID") - bs, err := base64.StdEncoding.DecodeString(streamUUIDStr) + streamUUIDStr := r.URL.Query().Get("stream_uuid") + unescapedStreamUUIDStr, err := url.PathUnescape(streamUUIDStr) + if err != nil { + s.logger.WithError(err).Error("failed to unescape stream_uuid") w.WriteHeader(http.StatusInternalServerError) return } - streamUUID, err := uuid.FromBytes(bs) + + bs, err := base64.StdEncoding.DecodeString(unescapedStreamUUIDStr) if err != nil { + s.logger.WithError(err).Errorf("failed to decode stream_uuid from %s", unescapedStreamUUIDStr) w.WriteHeader(http.StatusInternalServerError) return } - if err := s.DeleteStreamToSchemaAssociation(r.Context(), streamUUID); err != nil { + + streamUUID, err := uuid.FromBytes(bs) + if err != nil { + s.logger.WithError(err).Error("failed to parse stream id from bytes") w.WriteHeader(http.StatusInternalServerError) return - } else { - w.WriteHeader(http.StatusAccepted) } + s.DeleteStreamToSchemaAssociation(r.Context(), streamUUID) + w.WriteHeader(http.StatusAccepted) + default: w.WriteHeader(http.StatusMethodNotAllowed) } }) - if s.opts.jbpf.Enable { - http.HandleFunc("/control", func(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case http.MethodPost: - body, err := readBodyAs[SendControlRequest](r) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - if err := s.SendControl(r.Context(), &body); err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } else { - w.WriteHeader(http.StatusOK) - } - - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } - }) - } - srv := &http.Server{ - Addr: fmt.Sprintf("%s:%d", s.opts.control.ip, s.opts.control.port), + Addr: fmt.Sprintf("%s:%d", s.opts.ip, s.opts.port), Handler: nil, } diff --git a/pkg/schema/server.go b/pkg/schema/server.go index 9f54de1..25466d8 100644 --- a/pkg/schema/server.go +++ b/pkg/schema/server.go @@ -7,11 +7,9 @@ import ( "crypto/sha1" "encoding/base64" "fmt" - "jbpf_protobuf_cli/jbpf" "path/filepath" "strings" - "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/descriptorpb" @@ -21,32 +19,20 @@ import ( // Server is a server that implements the DynamicDecoderServer interface type Server struct { - ctx context.Context - jbpfClient *jbpf.Client - logger *logrus.Logger - opts *ServerOptions - store *Store + ctx context.Context + logger *logrus.Logger + opts *Options + store *Store } // NewServer returns a new Server -func NewServer(ctx context.Context, logger *logrus.Logger, opts *ServerOptions, store *Store) (*Server, error) { - var jbpfClient *jbpf.Client - var err error - - if opts.jbpf.Enable { - jbpfClient, err = jbpf.NewClient(logger, opts.jbpf) - if err != nil { - return nil, err - } - } - +func NewServer(ctx context.Context, logger *logrus.Logger, opts *Options, store *Store) *Server { return &Server{ - ctx: ctx, - jbpfClient: jbpfClient, - logger: logger, - opts: opts, - store: store, - }, nil + ctx: ctx, + logger: logger, + opts: opts, + store: store, + } } // Serve starts the server @@ -129,39 +115,8 @@ func (s *Server) AddStreamToSchemaAssociation(_ context.Context, req *AddSchemaA return nil } -// SendControl sends data to the jbpf agent -func (s *Server) SendControl(_ context.Context, req *SendControlRequest) error { - msg, err := s.store.GetProtoMsgInstance(req.StreamUUID) - if err != nil { - s.logger.WithError(err).Errorf("error creating instance of proto message %s", req.StreamUUID.String()) - return err - } - - err = protojson.Unmarshal([]byte(req.Payload), msg) - if err != nil { - s.logger.WithError(err).Error("error unmarshalling payload") - return err - } - - s.logger.WithFields(logrus.Fields{ - "msg": fmt.Sprintf("%T - \"%v\"", msg, msg), - }).Info("sending msg") - - payload, err := proto.Marshal(msg) - if err != nil { - return err - } - - out := append(req.StreamUUID[:], payload...) - if err := s.jbpfClient.Write(out); err != nil { - return err - } - - return nil -} - // DeleteStreamToSchemaAssociation removes the association between a stream and a schema -func (s *Server) DeleteStreamToSchemaAssociation(_ context.Context, req uuid.UUID) error { +func (s *Server) DeleteStreamToSchemaAssociation(_ context.Context, req uuid.UUID) { l := s.logger.WithField("streamUUID", req.String()) if current, ok := s.store.streamToSchema[req]; !ok { @@ -173,6 +128,4 @@ func (s *Server) DeleteStreamToSchemaAssociation(_ context.Context, req uuid.UUI "protoPackage": current.ProtoPackage, }).Info("association removed") } - - return nil } diff --git a/pkg/schema/server_options.go b/pkg/schema/server_options.go deleted file mode 100644 index 8d3d663..0000000 --- a/pkg/schema/server_options.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. - -package schema - -import ( - "errors" - "jbpf_protobuf_cli/jbpf" - - "github.com/spf13/pflag" -) - -// ServerOptions is the options for the decoder server -type ServerOptions struct { - control *controlOptions - jbpf *jbpf.Options -} - -// AddServerOptionsToFlags adds the server options to the flags -func AddServerOptionsToFlags(flags *pflag.FlagSet, opts *ServerOptions) { - if opts == nil { - return - } - if opts.control == nil { - opts.control = &controlOptions{} - } - if opts.jbpf == nil { - opts.jbpf = &jbpf.Options{} - } - - addControlOptionsToFlags(flags, opts.control) - jbpf.AddOptionsToFlags(flags, opts.jbpf) -} - -// Parse parses the server options -func (o *ServerOptions) Parse() error { - return errors.Join( - o.control.parse(), - o.jbpf.Parse(), - ) -}