# -*- coding: utf-8 -*-
"""Lacework API wrappers."""
import os
from dotenv import load_dotenv
import configparser
from laceworksdk.http_session import HttpSession
from .v2.activities import ActivitiesAPI
from .v2.agent_access_tokens import AgentAccessTokensAPI
from .v2.agent_info import AgentInfoAPI
from .v2.alert_channels import AlertChannelsAPI
from .v2.alert_profiles import AlertProfilesAPI
from .v2.alert_rules import AlertRulesAPI
from .v2.alerts import AlertsAPI
from .v2.audit_logs import AuditLogsAPI
from .v2.cloud_accounts import CloudAccountsAPI
from .v2.cloud_activities import CloudActivitiesAPI
from .v2.configs import ConfigsAPI
from .v2.container_registries import ContainerRegistriesAPI
from .v2.contract_info import ContractInfoAPI
from .v2.datasources import DatasourcesAPI
from .v2.data_export_rules import DataExportRulesAPI
from .v2.entities import EntitiesAPI
from .v2.events import EventsAPI
from .v2.inventory import InventoryAPI
from .v2.organization_info import OrganizationInfoAPI
from .v2.policies import PoliciesAPI
from .v2.policy_exceptions import PolicyExceptionsAPI
from .v2.queries import QueriesAPI
from .v2.report_definitions import ReportDefinitionsAPI
from .v2.report_rules import ReportRulesAPI
from .v2.reports import ReportsAPI
from .v2.resource_groups import ResourceGroupsAPI
from .v2.schemas import SchemasAPI
from .v2.team_members import TeamMembersAPI
from .v2.team_users import TeamUsersAPI
from .v2.user_groups import UserGroupsAPI
from .v2.user_profile import UserProfileAPI
from .v2.vulnerabilities import VulnerabilitiesAPI
from .v2.vulnerability_exceptions import VulnerabilityExceptionsAPI
from .v2.vulnerability_policies import VulnerabilityPoliciesAPI
from laceworksdk.config import (
DEFAULT_BASE_DOMAIN,
LACEWORK_ACCOUNT_ENVIRONMENT_VARIABLE,
LACEWORK_SUBACCOUNT_ENVIRONMENT_VARIABLE,
LACEWORK_API_KEY_ENVIRONMENT_VARIABLE,
LACEWORK_API_SECRET_ENVIRONMENT_VARIABLE,
LACEWORK_API_TOKEN_ENVIRONMENT_VARIABLE,
LACEWORK_API_BASE_DOMAIN_ENVIRONMENT_VARIABLE,
LACEWORK_API_CONFIG_SECTION_ENVIRONMENT_VARIABLE,
LACEWORK_CLI_CONFIG_RELATIVE_PATH,
)
load_dotenv()
[docs]
class LaceworkClient:
"""Lacework API wrapper for Python."""
def __init__(
self,
account=None,
subaccount=None,
api_key=None,
api_secret=None,
api_token=None,
instance=None,
base_domain=None,
profile=None,
):
"""Initializes the Lacework Client object.
Order of operation is:
1. Parameters passed in via the init function (flags).
2. Environmental variables.
3. Configuration file, located in ~/.lacework.toml
:return LaceworkClient object.
"""
# Attempt to use Environment Variables
self._account = (
account or instance or os.getenv(LACEWORK_ACCOUNT_ENVIRONMENT_VARIABLE)
)
self._subaccount = subaccount or os.getenv(
LACEWORK_SUBACCOUNT_ENVIRONMENT_VARIABLE
)
self._api_token = api_token or os.getenv(LACEWORK_API_TOKEN_ENVIRONMENT_VARIABLE)
self._api_key = api_key or os.getenv(LACEWORK_API_KEY_ENVIRONMENT_VARIABLE)
self._api_secret = api_secret or os.getenv(
LACEWORK_API_SECRET_ENVIRONMENT_VARIABLE
)
self._base_domain = (
base_domain
or os.getenv(LACEWORK_API_BASE_DOMAIN_ENVIRONMENT_VARIABLE)
or DEFAULT_BASE_DOMAIN
)
config_file_path = os.path.join(
os.path.expanduser("~"), LACEWORK_CLI_CONFIG_RELATIVE_PATH
)
if os.path.isfile(config_file_path):
profile = profile or os.getenv(
LACEWORK_API_CONFIG_SECTION_ENVIRONMENT_VARIABLE, "default"
)
config_obj = configparser.ConfigParser()
config_obj.read([config_file_path])
if config_obj.has_section(profile):
config_section = config_obj[profile]
api_key = config_section.get("api_key", "").strip('""')
if not self._api_key and api_key:
self._api_key = api_key
api_secret = config_section.get("api_secret", "").strip('""')
if not self._api_secret and api_secret:
self._api_secret = api_secret
subaccount = config_section.get("subaccount", "").strip('""')
if not self._account and not self._subaccount and subaccount:
self._subaccount = subaccount
account = config_section.get("account", "").strip('""')
if not self._account and account:
self._account = account
domain_string = f".{self._base_domain}"
if self._account.endswith(domain_string):
self._account = self._account[: -len(domain_string)]
# Create an HttpSession instance
self._session = HttpSession(
self._account,
self._subaccount,
self._api_key,
self._api_secret,
self._base_domain,
api_token=self._api_token
)
# API Wrappers
self.activities = ActivitiesAPI(self._session)
self.agent_access_tokens = AgentAccessTokensAPI(self._session)
self.agent_info = AgentInfoAPI(self._session)
self.alert_channels = AlertChannelsAPI(self._session)
self.alert_profiles = AlertProfilesAPI(self._session)
self.alert_rules = AlertRulesAPI(self._session)
self.alerts = AlertsAPI(self._session)
self.audit_logs = AuditLogsAPI(self._session)
self.cloud_accounts = CloudAccountsAPI(self._session)
self.cloud_activities = CloudActivitiesAPI(self._session)
self.configs = ConfigsAPI(self._session)
self.container_registries = ContainerRegistriesAPI(self._session)
self.contract_info = ContractInfoAPI(self._session)
self.datasources = DatasourcesAPI(self._session)
self.data_export_rules = DataExportRulesAPI(self._session)
self.entities = EntitiesAPI(self._session)
self.events = EventsAPI(self._session)
self.inventory = InventoryAPI(self._session)
self.organization_info = OrganizationInfoAPI(self._session)
self.policies = PoliciesAPI(self._session)
self.policy_exceptions = PolicyExceptionsAPI(self._session)
self.queries = QueriesAPI(self._session)
self.report_definitions = ReportDefinitionsAPI(self._session)
self.report_rules = ReportRulesAPI(self._session)
self.reports = ReportsAPI(self._session)
self.resource_groups = ResourceGroupsAPI(self._session)
self.schemas = SchemasAPI(self._session)
self.team_members = TeamMembersAPI(self._session)
self.team_users = TeamUsersAPI(self._session)
self.user_groups = UserGroupsAPI(self._session)
self.user_profile = UserProfileAPI(self._session)
self.vulnerabilities = VulnerabilitiesAPI(self._session)
self.vulnerability_exceptions = VulnerabilityExceptionsAPI(self._session)
self.vulnerability_policies = VulnerabilityPoliciesAPI(self._session)
@property
def subaccount(self):
"""Returns the value of the session's subaccount."""
return self._session.subaccount
[docs]
def set_org_level_access(self, org_level_access):
"""A method to set whether the client should use organization-level API calls."""
if org_level_access is True:
self._session._org_level_access = True
else:
self._session._org_level_access = False
[docs]
def set_subaccount(self, subaccount):
"""A method to update the subaccount the client should use for API calls."""
self._session.subaccount = subaccount