Skip to content

[AWSX][Logs Forwarder] Enable Email and IP redaction based on user config only #910

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions aws/logs_monitoring/logs/datadog_scrubber.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ def __init__(self, configs):
if config.name in os.environ:
rules.append(
ScrubbingRule(
compileRegex(config.name, config.pattern), config.placeholder
compileRegex(config.name, config.pattern),
config.placeholder,
config.enabled,
)
)
self._rules = rules

def scrub(self, payload):
for rule in self._rules:
if rule.enabled is False:
continue
try:
payload = rule.regex.sub(rule.placeholder, payload)
except Exception:
Expand All @@ -31,6 +35,7 @@ def scrub(self, payload):


class ScrubbingRule(object):
def __init__(self, regex, placeholder):
def __init__(self, regex, placeholder, enabled):
self.regex = regex
self.placeholder = placeholder
self.enabled = enabled
9 changes: 7 additions & 2 deletions aws/logs_monitoring/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,27 @@ def get_env_var(envvar, default, boolean=False):


class ScrubbingRuleConfig(object):
def __init__(self, name, pattern, placeholder):
def __init__(self, name, pattern, placeholder, enabled=True):
self.name = name
self.pattern = pattern
self.placeholder = placeholder
self.enabled = enabled


# Scrubbing sensitive data
# Option to redact all pattern that looks like an ip address / email address / custom pattern
SCRUBBING_RULE_CONFIGS = [
ScrubbingRuleConfig(
"REDACT_IP", r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", "xxx.xxx.xxx.xxx"
"REDACT_IP",
r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}",
"xxx.xxx.xxx.xxx",
get_env_var("REDACT_IP", "false", boolean=True),
),
ScrubbingRuleConfig(
"REDACT_EMAIL",
r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+",
"[email protected]",
get_env_var("REDACT_EMAIL", "false", boolean=True),
),
ScrubbingRuleConfig(
"DD_SCRUBBING_RULE",
Expand Down
31 changes: 18 additions & 13 deletions aws/logs_monitoring/tests/test_logs.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import unittest
import os
import sys
from importlib import reload
import unittest.mock

from logs.datadog_scrubber import DatadogScrubber
from logs.datadog_batcher import DatadogBatcher
from logs.helpers import filter_logs
from settings import ScrubbingRuleConfig, SCRUBBING_RULE_CONFIGS, get_env_var


class TestScrubLogs(unittest.TestCase):
@unittest.mock.patch.dict(os.environ, {"REDACT_IP": "true", "REDACT_EMAIL": "true"})
def test_scrubbing_rule_config(self):
os.environ["REDACT_IP"] = ""
os.environ["REDACT_EMAIL"] = ""
reload(sys.modules["settings"])
from settings import SCRUBBING_RULE_CONFIGS

scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS)
payload = scrubber.scrub(
"ip_address is 127.0.0.1, email is [email protected]"
Expand All @@ -21,17 +25,18 @@ def test_scrubbing_rule_config(self):
os.environ.pop("REDACT_IP", None)
os.environ.pop("REDACT_EMAIL", None)

@unittest.mock.patch.dict(
os.environ,
{
"DD_SCRUBBING_RULE": "[^\u0001-\u007f]+",
"DD_SCRUBBING_RULE_REPLACEMENT": "xxxxx",
},
)
def test_non_ascii(self):
os.environ["DD_SCRUBBING_RULE"] = "[^\u0001-\u007f]+"
scrubber = DatadogScrubber(
[
ScrubbingRuleConfig(
"DD_SCRUBBING_RULE",
get_env_var("DD_SCRUBBING_RULE", default=None),
get_env_var("DD_SCRUBBING_RULE_REPLACEMENT", default="xxxxx"),
)
]
)
reload(sys.modules["settings"])
from settings import SCRUBBING_RULE_CONFIGS

scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS)
payload = scrubber.scrub("abcdef日本語efgかきくけこhij")
self.assertEqual(payload, "abcdefxxxxxefgxxxxxhij")
os.environ.pop("DD_SCRUBBING_RULE", None)
Expand Down
Loading