Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic: add sequence scope #2532

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### New Features

- add dynamic sequence scope for matching nearby calls within a thread #2532 @williballenthin
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming alternatives to sequence (matching occurs in any order): span, ngram, group/cluster

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 cluster

Copy link
Collaborator Author

@williballenthin williballenthin Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"window", "slice", "range"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

math: multiset (or bag, or mset) - https://en.wikipedia.org/wiki/Multiset

  • multiple instances of same object
  • order doesn't matter

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optionally prefix with "call", e.g., callbag, callcluster?


### Breaking Changes

### New Rules (0)
Expand Down
30 changes: 23 additions & 7 deletions capa/capabilities/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,28 @@
import logging
import itertools
import collections
from typing import Any
from typing import Optional
from dataclasses import dataclass

from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.features.address import NO_ADDRESS
from capa.render.result_document import LibraryFunction, StaticFeatureCounts, DynamicFeatureCounts
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor

logger = logging.getLogger(__name__)


def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
@dataclass
class FileCapabilities:
features: FeatureSet
matches: MatchResults
feature_count: int


def find_file_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet
) -> FileCapabilities:
file_features: FeatureSet = collections.defaultdict(set)

for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()):
Expand All @@ -36,8 +47,8 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi

file_features.update(function_features)

_, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS)
return matches, len(file_features)
features, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS)
return FileCapabilities(features, matches, len(file_features))


def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool:
Expand All @@ -62,9 +73,14 @@ def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalon
return False


def find_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs
) -> tuple[MatchResults, Any]:
@dataclass
class Capabilities:
matches: MatchResults
feature_counts: StaticFeatureCounts | DynamicFeatureCounts
library_functions: Optional[tuple[LibraryFunction, ...]] = None


def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs) -> Capabilities:
from capa.capabilities.static import find_static_capabilities
from capa.capabilities.dynamic import find_dynamic_capabilities

Expand Down
211 changes: 169 additions & 42 deletions capa/capabilities/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,38 @@
import logging
import itertools
import collections
from typing import Any
from dataclasses import dataclass

import capa.perf
import capa.features.freeze as frz
import capa.render.result_document as rdoc
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.capabilities.common import find_file_capabilities
from capa.features.address import DynamicCallAddress, DynamicSequenceAddress, _NoAddress
from capa.capabilities.common import Capabilities, find_file_capabilities
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor

logger = logging.getLogger(__name__)


# The number of calls that make up a sequence.
#
# The larger this is, the more calls are grouped together to match rule logic.
# This means a longer chain can be recognized; however, its a bit more expensive.
SEQUENCE_SIZE = 20


@dataclass
class CallCapabilities:
features: FeatureSet
matches: MatchResults


def find_call_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> tuple[FeatureSet, MatchResults]:
) -> CallCapabilities:
"""
find matches for the given rules for the given call.

returns: tuple containing (features for call, match results for call)
"""
# all features found for the call.
features: FeatureSet = collections.defaultdict(set)
Expand All @@ -46,16 +58,23 @@ def find_call_capabilities(
for addr, _ in res:
capa.engine.index_rule_matches(features, rule, [addr])

return features, matches
return CallCapabilities(features, matches)


@dataclass
class ThreadCapabilities:
features: FeatureSet
thread_matches: MatchResults
sequence_matches: MatchResults
call_matches: MatchResults


def find_thread_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle
) -> tuple[FeatureSet, MatchResults, MatchResults]:
) -> ThreadCapabilities:
williballenthin marked this conversation as resolved.
Show resolved Hide resolved
"""
find matches for the given rules within the given thread.

returns: tuple containing (features for thread, match results for thread, match results for calls)
find matches for the given rules within the given thread,
which includes matches for all the sequences and calls within it.
"""
# all features found within this thread,
# includes features found within calls.
Expand All @@ -65,14 +84,94 @@ def find_thread_capabilities(
# might be found at different calls, that's ok.
call_matches: MatchResults = collections.defaultdict(list)

# matches found at the sequence scope.
sequence_matches: MatchResults = collections.defaultdict(list)

# We matches sequences as the sliding window of calls with size SEQUENCE_SIZE.
#
# For each call, we consider the window of SEQUENCE_SIZE calls leading up to it,
# merging all their features and doing a match.
#
# We track these features in two data structures:
# 1. a deque of those features found in the prior calls.
# We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed.
# 2. a live set of features seen in the sequence.
# As we pop from the deque, we remove features from the current set,
# and as we push to the deque, we insert features to the current set.
# With this approach, our algorithm performance is independent of SEQUENCE_SIZE.
# The naive algorithm, of merging all the trailing feature sets at each call, is dependent upon SEQUENCE_SIZE
# (that is, runtime gets slower the larger SEQUENCE_SIZE is).
sequence_call_addresses: collections.deque[DynamicCallAddress] = collections.deque(maxlen=SEQUENCE_SIZE)
sequence_feature_sets: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE)
sequence_features: FeatureSet = collections.defaultdict(set)

# the names of rules matched at the last sequence,
# so that we can deduplicate long strings of the same matche.
williballenthin marked this conversation as resolved.
Show resolved Hide resolved
last_sequence_matches: set[str] = set()

call_count = 0
for ch in extractor.get_calls(ph, th):
ifeatures, imatches = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in ifeatures.items():
call_count += 1
call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in call_capabilities.features.items():
features[feature].update(vas)

for rule_name, res in imatches.items():
for rule_name, res in call_capabilities.matches.items():
call_matches[rule_name].extend(res)

#
# sequence scope matching
#
sequence_call_addresses.append(ch.address)
# TODO: it would be nice to create this only when needed, since it generates garbage.
sequence_address = DynamicSequenceAddress(
th.address, id=ch.address.id, calls=tuple(address.id for address in sequence_call_addresses)
)

# As we add items to the end of the deque, overflow and drop the oldest items (at the left end).
# While we could rely on `deque.append` with `maxlen` set (which we provide above),
# we want to use the dropped item first, to remove the old features, so we manually pop it here.
if len(sequence_feature_sets) == SEQUENCE_SIZE:
overflowing_feature_set = sequence_feature_sets.popleft()

for feature, vas in overflowing_feature_set.items():
if len(vas) == 1 and isinstance(next(iter(vas)), _NoAddress):
# `vas == { NO_ADDRESS }` without the garbage.
#
# ignore the common case of global features getting added/removed/trimmed repeatedly,
# like arch/os/format.
continue

feature_vas = sequence_features[feature]
feature_vas.difference_update(vas)
if not feature_vas:
del sequence_features[feature]

# update the deque and set of features with the latest call's worth of features.
latest_features = call_capabilities.features
sequence_feature_sets.append(latest_features)
for feature, vas in latest_features.items():
sequence_features[feature].update(vas)

_, smatches = ruleset.match(Scope.SEQUENCE, sequence_features, sequence_address)
# TODO: if smatches: create the sequence location
for rule_name, res in smatches.items():
# TODO: maybe just garbage collect here better.
if rule_name in last_sequence_matches:
# don't emit match results for rules seen during the immediately preceeding sequence.
#
# This means that we won't emit duplicate matches when there are multiple sequences
# that overlap a single matching event.
# It also handles the case of a tight loop containing matched logic;
# only the first match will be recorded.
#
# In theory, this means the result document doesn't have *every* possible match location,
# but in practice, humans will only be interested in the first handful anyways.
continue
sequence_matches[rule_name].extend(res)

last_sequence_matches = set(smatches.keys())

for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()):
features[feature].add(va)

Expand All @@ -84,16 +183,31 @@ def find_thread_capabilities(
for va, _ in res:
capa.engine.index_rule_matches(features, rule, [va])

return features, matches, call_matches
logger.debug(
"analyzed thread %d[%d] with %d events, %d features, and %d matches",
th.address.process.pid,
th.address.tid,
call_count,
len(features),
len(matches) + len(sequence_matches) + len(call_matches),
)
return ThreadCapabilities(features, matches, sequence_matches, call_matches)


@dataclass
class ProcessCapabilities:
process_matches: MatchResults
thread_matches: MatchResults
sequence_matches: MatchResults
call_matches: MatchResults
feature_count: int


def find_process_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle
) -> tuple[MatchResults, MatchResults, MatchResults, int]:
) -> ProcessCapabilities:
"""
find matches for the given rules within the given process.

returns: tuple containing (match results for process, match results for threads, match results for calls, number of features)
"""
# all features found within this process,
# includes features found within threads (and calls).
Expand All @@ -103,33 +217,48 @@ def find_process_capabilities(
# might be found at different threads, that's ok.
thread_matches: MatchResults = collections.defaultdict(list)

# matches found at the sequence scope.
# might be found at different sequences, that's ok.
sequence_matches: MatchResults = collections.defaultdict(list)

# matches found at the call scope.
# might be found at different calls, that's ok.
call_matches: MatchResults = collections.defaultdict(list)

for th in extractor.get_threads(ph):
features, tmatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th)
for feature, vas in features.items():
thread_capabilities = find_thread_capabilities(ruleset, extractor, ph, th)
for feature, vas in thread_capabilities.features.items():
process_features[feature].update(vas)

for rule_name, res in tmatches.items():
for rule_name, res in thread_capabilities.thread_matches.items():
thread_matches[rule_name].extend(res)

for rule_name, res in cmatches.items():
for rule_name, res in thread_capabilities.sequence_matches.items():
sequence_matches[rule_name].extend(res)

for rule_name, res in thread_capabilities.call_matches.items():
call_matches[rule_name].extend(res)

for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()):
process_features[feature].add(va)

_, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address)
return process_matches, thread_matches, call_matches, len(process_features)

logger.debug(
"analyzed process %d and extracted %d features with %d matches",
ph.address.pid,
len(process_features),
len(process_matches),
)
return ProcessCapabilities(process_matches, thread_matches, sequence_matches, call_matches, len(process_features))


def find_dynamic_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None
) -> tuple[MatchResults, Any]:
ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress: bool = False
) -> Capabilities:
all_process_matches: MatchResults = collections.defaultdict(list)
all_thread_matches: MatchResults = collections.defaultdict(list)
all_sequence_matches: MatchResults = collections.defaultdict(list)
all_call_matches: MatchResults = collections.defaultdict(list)

feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=())
Expand All @@ -143,19 +272,20 @@ def find_dynamic_capabilities(
) as pbar:
task = pbar.add_task("matching", total=n_processes, unit="processes")
for p in processes:
process_matches, thread_matches, call_matches, feature_count = find_process_capabilities(
ruleset, extractor, p
)
process_capabilities = find_process_capabilities(ruleset, extractor, p)
feature_counts.processes += (
rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count),
rdoc.ProcessFeatureCount(
address=frz.Address.from_capa(p.address), count=process_capabilities.feature_count
),
)
logger.debug("analyzed %s and extracted %d features", p.address, feature_count)

for rule_name, res in process_matches.items():
for rule_name, res in process_capabilities.process_matches.items():
all_process_matches[rule_name].extend(res)
for rule_name, res in thread_matches.items():
for rule_name, res in process_capabilities.thread_matches.items():
all_thread_matches[rule_name].extend(res)
for rule_name, res in call_matches.items():
for rule_name, res in process_capabilities.sequence_matches.items():
all_sequence_matches[rule_name].extend(res)
for rule_name, res in process_capabilities.call_matches.items():
all_call_matches[rule_name].extend(res)

pbar.advance(task)
Expand All @@ -164,29 +294,26 @@ def find_dynamic_capabilities(
# mapping from feature (matched rule) to set of addresses at which it matched.
process_and_lower_features: FeatureSet = collections.defaultdict(set)
for rule_name, results in itertools.chain(
all_process_matches.items(), all_thread_matches.items(), all_call_matches.items()
all_process_matches.items(), all_thread_matches.items(), all_sequence_matches.items(), all_call_matches.items()
):
locations = {p[0] for p in results}
rule = ruleset[rule_name]
capa.engine.index_rule_matches(process_and_lower_features, rule, locations)

all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features)
feature_counts.file = feature_count
all_file_capabilities = find_file_capabilities(ruleset, extractor, process_and_lower_features)
feature_counts.file = all_file_capabilities.feature_count

matches = dict(
itertools.chain(
# each rule exists in exactly one scope,
# so there won't be any overlap among these following MatchResults,
# and we can merge the dictionaries naively.
all_call_matches.items(),
all_sequence_matches.items(),
all_thread_matches.items(),
all_process_matches.items(),
all_call_matches.items(),
all_file_matches.items(),
all_file_capabilities.matches.items(),
)
)

meta = {
"feature_counts": feature_counts,
}

return matches, meta
return Capabilities(matches, feature_counts)
Loading
Loading