Skip to content

Commit

Permalink
feat(ct-metrics): BPF implementation (#1102)
Browse files Browse the repository at this point in the history
# Description

BPF implementation for connection tracking metrics. This is the
data-plane work mentioned in this comment
#1057 (comment)

Summary

- feature flag enableConntrackMetrics
- counters incremented within IFDEF in BPF
- counters: packets forward/reply + bytes forward/reply 
- conntrack metadata includes metrics and is added to packets struct
- add/update unit tests for conntrack_linux and packetparser_linux

## Related Issue

#806

## Checklist

- [x] I have read the [contributing
documentation](https://retina.sh/docs/contributing).
- [x] I signed and signed-off the commits (`git commit -S -s ...`). See
[this
documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/about-commit-signature-verification)
on signing commits.
- [x] I have correctly attributed the author(s) of the code.
- [x] I have tested the changes locally.
- [x] I have followed the project's style guidelines.
- [x] I have updated the documentation, if necessary.
- [x] I have added tests, if applicable.

## Screenshots (if applicable) or Testing Completed

Please add any relevant screenshots or GIFs to showcase the changes
made.

1. `enableConntrackMetrics=false`
    ```sh
    # bpftool map dump id 994 -j | jq -r .[0]
    {
      "key": [
        ...
      ],
      "value": [
        ...
      ],
      "formatted": {
        "key": {
          ...
        },
        "value": {
          ...
          "conntrack_metadata": {
            "bytes_forward_count": 0,
            "bytes_reply_count": 0,
            "packets_forward_count": 0,
            "packets_reply_count": 0
          }
        }
      }
    }
    ```
2. `enableConntrackMetrics=true`
    ```sh
    # bpftool map dump id 1019 -j | jq -r .[0]
    {
      "key": [
        ...
      ],
      "value": [
        ...
      ],
      "formatted": {
        "key": {
          ...
        },
        "value": {
          ...,
          "conntrack_metadata": {
            "bytes_forward_count": 13440,
            "bytes_reply_count": 56335,
            "packets_forward_count": 56,
            "packets_reply_count": 43
          }
        }
      }
    }
    ```
    
At userland level I provisionally added a debug statement, just for this
test, in `packetparser_linux.go` (without IP and proto translation)
    
    ```sh
❯ k logs -n kube-system retina-agent-chvdh | head -n 10 | grep metadata
    Defaulted container "retina" out of: retina, init-retina (init)
ts=2024-12-13T10:37:08.881Z level=debug
caller=packetparser/packetparser_linux.go:577 msg="Conntrack metadata"
SrcIp=788657162 DstIp=2499867658 SrcPort=19117 DstPort=23313 Proto=6
PacketsForwardCount=73 PacketsReplyCount=83 BytesForwardCount=16068
BytesReplyCount=6936
ts=2024-12-13T10:37:08.881Z level=debug
caller=packetparser/packetparser_linux.go:577 msg="Conntrack metadata"
SrcIp=788657162 DstIp=2499867658 SrcPort=19117 DstPort=23313 Proto=6
PacketsForwardCount=73 PacketsReplyCount=82 BytesForwardCount=16068
BytesReplyCount=6870
    ```


## Additional Notes

Add any additional notes or context about the pull request here.

---

Please refer to the [CONTRIBUTING.md](../CONTRIBUTING.md) file for more
information on how to contribute to this project.
  • Loading branch information
SRodi authored Dec 13, 2024
1 parent 6cb7d34 commit 47ccd8b
Show file tree
Hide file tree
Showing 16 changed files with 345 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ data:
metricsIntervalDuration: {{ .Values.metricsIntervalDuration }}
enableTelemetry: {{ .Values.enableTelemetry }}
enablePodLevel: {{ .Values.enablePodLevel }}
enableConntrackMetrics: {{ .Values.enableConntrackMetrics }}
remoteContext: {{ .Values.remoteContext }}
enableAnnotations: {{ .Values.enableAnnotations }}
bypassLookupIPOfInterest: {{ .Values.bypassLookupIPOfInterest }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ data:
metricsIntervalDuration: {{ .Values.metricsIntervalDuration }}
enableTelemetry: {{ .Values.enableTelemetry }}
enablePodLevel: {{ .Values.enablePodLevel }}
enableConntrackMetrics: {{ .Values.enableConntrackMetrics }}
remoteContext: {{ .Values.remoteContext }}
enableAnnotations: {{ .Values.enableAnnotations }}
bypassLookupIPOfInterest: {{ .Values.bypassLookupIPOfInterest }}
Expand Down
1 change: 1 addition & 0 deletions deploy/legacy/manifests/controller/helm/retina/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ image:
# Overrides the image tag whose default is the chart appVersion.
tag: "v0.0.2"

enableConntrackMetrics: false
enablePodLevel: false
remoteContext: false
enableAnnotations: false
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Config struct {
EnableTelemetry bool `yaml:"enableTelemetry"`
EnableRetinaEndpoint bool `yaml:"enableRetinaEndpoint"`
EnablePodLevel bool `yaml:"enablePodLevel"`
EnableConntrackMetrics bool `yaml:"enableConntrackMetrics"`
RemoteContext bool `yaml:"remoteContext"`
EnableAnnotations bool `yaml:"enableAnnotations"`
BypassLookupIPOfInterest bool `yaml:"bypassLookupIPOfInterest"`
Expand Down
74 changes: 66 additions & 8 deletions pkg/plugin/conntrack/_cprog/conntrack.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "compiler.h"
#include "bpf_helpers.h"
#include "conntrack.h"
#include "dynamic.h"

struct tcpmetadata {
__u32 seq; // TCP sequence number
Expand All @@ -15,6 +16,20 @@ struct tcpmetadata {
__u32 tsecr; // TCP timestamp echo reply
};

struct conntrackmetadata {
/*
bytes_*_count indicates the number of bytes sent and received in the forward and reply direction.
These values will be based on the conntrack entry.
*/
__u64 bytes_forward_count;
__u64 bytes_reply_count;
/*
packets_*_count indicates the number of packets sent and received in the forward and reply direction.
These values will be based on the conntrack entry.
*/
__u32 packets_forward_count;
__u32 packets_reply_count;
};

struct packet
{
Expand All @@ -30,6 +45,7 @@ struct packet
__u8 proto;
__u8 flags; // For TCP packets, this is the TCP flags. For UDP packets, this is will always be 1 for conntrack purposes.
bool is_reply;
struct conntrackmetadata conntrack_metadata;
};


Expand Down Expand Up @@ -68,6 +84,7 @@ struct ct_entry {
* before retina deployment and the SYN packet was not captured.
*/
bool is_direction_unknown;
struct conntrackmetadata conntrack_metadata;
};

struct {
Expand Down Expand Up @@ -110,11 +127,11 @@ static __always_inline __u8 _ct_get_traffic_direction(__u8 observation_point) {

/**
* Create a new TCP connection.
* @arg *p pointer to the packet to be processed.
* @arg key The key to be used to create the new connection.
* @arg flags The flags of the packet.
* @arg observation_point The point in the network stack where the packet is observed.
*/
static __always_inline bool _ct_create_new_tcp_connection(struct ct_v4_key key, __u8 flags, __u8 observation_point) {
static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, struct ct_v4_key key, __u8 observation_point) {
struct ct_entry new_value;
__builtin_memset(&new_value, 0, sizeof(struct ct_entry));
__u64 now = bpf_mono_now();
Expand All @@ -123,9 +140,20 @@ static __always_inline bool _ct_create_new_tcp_connection(struct ct_v4_key key,
return false;
}
new_value.eviction_time = now + CT_SYN_TIMEOUT;
new_value.flags_seen_tx_dir = flags;
new_value.flags_seen_tx_dir = p->flags;
new_value.is_direction_unknown = false;
new_value.traffic_direction = _ct_get_traffic_direction(observation_point);

#ifdef ENABLE_CONNTRACK_METRICS
new_value.conntrack_metadata.packets_forward_count = 1;
new_value.conntrack_metadata.bytes_forward_count = p->bytes;
// Update initial conntrack metadata for the connection.
__builtin_memcpy(&p->conntrack_metadata, &new_value.conntrack_metadata, sizeof(struct conntrackmetadata));
#endif // ENABLE_CONNTRACK_METRICS

// Update packet
p->is_reply = false;
p->traffic_direction = new_value.traffic_direction;
bpf_map_update_elem(&retina_conntrack, &key, &new_value, BPF_ANY);
return true;
}
Expand All @@ -148,10 +176,17 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c
new_value.flags_seen_tx_dir = p->flags;
new_value.last_report_tx_dir = now;
new_value.traffic_direction = _ct_get_traffic_direction(observation_point);
bpf_map_update_elem(&retina_conntrack, &key, &new_value, BPF_ANY);
#ifdef ENABLE_CONNTRACK_METRICS
new_value.conntrack_metadata.packets_forward_count = 1;
new_value.conntrack_metadata.bytes_forward_count = p->bytes;
// Update packet's conntrack metadata.
__builtin_memcpy(&p->conntrack_metadata, &new_value.conntrack_metadata, sizeof(struct conntrackmetadata));;
#endif // ENABLE_CONNTRACK_METRICS

// Update packet
p->is_reply = false;
p->traffic_direction = new_value.traffic_direction;
bpf_map_update_elem(&retina_conntrack, &key, &new_value, BPF_ANY);
return true;
}

Expand All @@ -165,11 +200,8 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c
static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct ct_v4_key key, struct ct_v4_key reverse_key, __u8 observation_point) {
// Check if the packet is a SYN packet.
if (p->flags & TCP_SYN) {
// Update packet accordingly.
p->is_reply = false;
p->traffic_direction = _ct_get_traffic_direction(observation_point);
// Create a new connection with a timeout of CT_SYN_TIMEOUT.
return _ct_create_new_tcp_connection(key, p->flags, observation_point);
return _ct_create_new_tcp_connection(p, key, observation_point);
}

// The packet is not a SYN packet and the connection corresponding to this packet is not found.
Expand All @@ -193,13 +225,25 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
p->is_reply = true;
new_value.flags_seen_rx_dir = p->flags;
new_value.last_report_rx_dir = now;
#ifdef ENABLE_CONNTRACK_METRICS
new_value.conntrack_metadata.bytes_reply_count = p->bytes;
new_value.conntrack_metadata.packets_reply_count = 1;
#endif // ENABLE_CONNTRACK_METRICS
bpf_map_update_elem(&retina_conntrack, &reverse_key, &new_value, BPF_ANY);
} else { // Otherwise, the packet is considered as a packet in the send direction.
p->is_reply = false;
new_value.flags_seen_tx_dir = p->flags;
new_value.last_report_tx_dir = now;
#ifdef ENABLE_CONNTRACK_METRICS
new_value.conntrack_metadata.bytes_forward_count = p->bytes;
new_value.conntrack_metadata.packets_forward_count = 1;
#endif // ENABLE_CONNTRACK_METRICS
bpf_map_update_elem(&retina_conntrack, &key, &new_value, BPF_ANY);
}
#ifdef ENABLE_CONNTRACK_METRICS
// Update packet's conntrack metadata.
__builtin_memcpy(&p->conntrack_metadata, &new_value.conntrack_metadata, sizeof(struct conntrackmetadata));
#endif // ENABLE_CONNTRACK_METRICS
return true;
}

Expand Down Expand Up @@ -318,6 +362,13 @@ static __always_inline __attribute__((unused)) bool ct_process_packet(struct pac
// Update the packet accordingly.
p->is_reply = false;
p->traffic_direction = entry->traffic_direction;
#ifdef ENABLE_CONNTRACK_METRICS
// Update packet count and bytes count on conntrack entry.
WRITE_ONCE(entry->conntrack_metadata.packets_forward_count, READ_ONCE(entry->conntrack_metadata.packets_forward_count) + 1);
WRITE_ONCE(entry->conntrack_metadata.bytes_forward_count, READ_ONCE(entry->conntrack_metadata.bytes_forward_count) + p->bytes);
// Update packet's conntract metadata.
__builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata));
#endif // ENABLE_CONNTRACK_METRICS
return _ct_should_report_packet(entry, p->flags, CT_PACKET_DIR_TX, &key);
}

Expand All @@ -333,6 +384,13 @@ static __always_inline __attribute__((unused)) bool ct_process_packet(struct pac
// Update the packet accordingly.
p->is_reply = true;
p->traffic_direction = entry->traffic_direction;
#ifdef ENABLE_CONNTRACK_METRICS
// Update packet count and bytes count on conntrack entry.
WRITE_ONCE(entry->conntrack_metadata.packets_reply_count, READ_ONCE(entry->conntrack_metadata.packets_reply_count) + 1);
WRITE_ONCE(entry->conntrack_metadata.bytes_reply_count, READ_ONCE(entry->conntrack_metadata.bytes_reply_count) + p->bytes);
// Update packet's conntract metadata.
__builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata));
#endif // ENABLE_CONNTRACK_METRICS
return _ct_should_report_packet(entry, p->flags, CT_PACKET_DIR_RX, &reverse_key);
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/plugin/conntrack/_cprog/dynamic.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Place holder header file that will be replaced by the actual header file during runtime
// DO NOT DELETE
6 changes: 6 additions & 0 deletions pkg/plugin/conntrack/conntrack_bpfel_arm64.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/plugin/conntrack/conntrack_bpfel_x86.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions pkg/plugin/conntrack/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ package conntrack

import (
"context"
"fmt"
"path"
"runtime"
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/rlimit"
"github.com/microsoft/retina/internal/ktime"
"github.com/microsoft/retina/pkg/loader"
"github.com/microsoft/retina/pkg/log"
plugincommon "github.com/microsoft/retina/pkg/plugin/common"
_ "github.com/microsoft/retina/pkg/plugin/conntrack/_cprog" // nolint // This is needed so cprog is included when vendoring
Expand Down Expand Up @@ -66,6 +70,27 @@ func New() (*Conntrack, error) {
return ct, nil
}

// Build dynamic header path
func BuildDynamicHeaderPath() string {
// Get absolute path to this file during runtime.
_, filename, _, ok := runtime.Caller(0)
if !ok {
return ""
}
currDir := path.Dir(filename)
return fmt.Sprintf("%s/%s/%s", currDir, bpfSourceDir, dynamicHeaderFileName)
}

// Generate dynamic header file for conntrack eBPF program.
func GenerateDynamic(ctx context.Context, dynamicHeaderPath string, conntrackMetrics int) error {
st := fmt.Sprintf("#define ENABLE_CONNTRACK_METRICS %d\n", conntrackMetrics)
err := loader.WriteFile(ctx, dynamicHeaderPath, st)
if err != nil {
return errors.Wrap(err, "failed to write conntrack dynamic header")
}
return nil
}

// Run starts the Conntrack garbage collection loop.
func (ct *Conntrack) Run(ctx context.Context) error {
ticker := time.NewTicker(ct.gcFrequency)
Expand Down
92 changes: 92 additions & 0 deletions pkg/plugin/conntrack/conntrack_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package conntrack

import (
"context"
"fmt"
"os"
"path"
"runtime"
"testing"
)

func TestBuildDynamicHeaderPath(t *testing.T) {
tests := []struct {
name string
expectedPath string
}{
{
name: "ExpectedPath",
expectedPath: fmt.Sprintf("%s/%s/%s", path.Dir(getCurrentFilePath(t)), bpfSourceDir, dynamicHeaderFileName),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualPath := BuildDynamicHeaderPath()
if actualPath != tt.expectedPath {
t.Errorf("unexpected dynamic header path: got %q, want %q", actualPath, tt.expectedPath)
}
})
}
}

func TestGenerateDynamic(t *testing.T) {
tests := []struct {
name string
conntrackMetrics int
expectedContents string
}{
{
name: "ConntrackMetricsEnabled",
conntrackMetrics: 1,
expectedContents: "#define ENABLE_CONNTRACK_METRICS 1\n",
},
{
name: "ConntrackMetricsDisabled",
conntrackMetrics: 0,
expectedContents: "#define ENABLE_CONNTRACK_METRICS 0\n",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a temporary directory
tempDir, err := os.MkdirTemp("", "conntrack_test")
if err != nil {
t.Fatalf("failed to create temp directory: %v", err)
}
// Clean up the temporary directory after the test completes
defer os.RemoveAll(tempDir)

// Override the dynamicHeaderPath to use the temporary directory
dynamicHeaderPath := path.Join(tempDir, dynamicHeaderFileName)

// Call the GenerateDynamic function and check if it returns an error.
ctx := context.Background()
if err = GenerateDynamic(ctx, dynamicHeaderPath, tt.conntrackMetrics); err != nil {
t.Fatalf("failed to generate dynamic header: %v", err)
}

// Verify that the dynamic header file was created in the expected location and contains the expected contents.
if _, err = os.Stat(dynamicHeaderPath); os.IsNotExist(err) {
t.Fatalf("dynamic header file does not exist: %v", err)
}

actualContents, err := os.ReadFile(dynamicHeaderPath)
if err != nil {
t.Fatalf("failed to read dynamic header file: %v", err)
}
if string(actualContents) != tt.expectedContents {
t.Errorf("unexpected dynamic header file contents: got %q, want %q", string(actualContents), tt.expectedContents)
}
})
}
}

func getCurrentFilePath(t *testing.T) string {
_, filename, _, ok := runtime.Caller(1)
if !ok {
t.Fatal("failed to determine test file path")
}
return filename
}
5 changes: 4 additions & 1 deletion pkg/plugin/conntrack/types_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
)

const (
defaultGCFrequency = 15 * time.Second
defaultGCFrequency = 15 * time.Second
bpfSourceDir = "_cprog"
bpfSourceFileName = "conntrack.c"
dynamicHeaderFileName = "dynamic.h"
)

type Conntrack struct {
Expand Down
6 changes: 6 additions & 0 deletions pkg/plugin/packetparser/_cprog/packetparser.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ static void parse(struct __sk_buff *skb, __u8 obs)
return;
}

#ifdef ENABLE_CONNTRACK_METRICS
// Initialize conntrack metadata in packet struct.
struct conntrackmetadata conntrack_metadata;
__builtin_memset(&conntrack_metadata, 0, sizeof(conntrack_metadata));
p.conntrack_metadata = conntrack_metadata;
#endif // ENABLE_CONNTRACK_METRICS

// Process the packet in ct
bool report __attribute__((unused));
Expand Down
Loading

0 comments on commit 47ccd8b

Please sign in to comment.