# -*- coding: utf-8 -*-
"""
HttpSession class for package HTTP functions.
"""
import json
import logging
import requests
from datetime import datetime, timezone
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from laceworksdk import version
from laceworksdk.config import (
DEFAULT_ACCESS_TOKEN_EXPIRATION,
DEFAULT_SUCCESS_RESPONSE_CODES,
RATE_LIMIT_RESPONSE_CODE,
)
from laceworksdk.exceptions import ApiError, MalformedResponse, RateLimitError
logger = logging.getLogger(__name__)
[docs]
class HttpSession:
"""
Package HttpSession class.
"""
_access_token = None
_access_token_expiry = None
def __init__(self, account, subaccount, api_key, api_secret, base_domain, api_token=None):
"""
Initializes the HttpSession object.
Args:
account (str): a Lacework Account name
subaccount (str): a Lacework Sub-account name
api_key (str): a Lacework API Key
api_secret (str): a Lacework API Secret
base_domain (str): a Lacework Domain (defaults to "lacework.net")
api_token (str): a Lacework API token (instead of key and secret)
Returns:
HttpSession: An instance of this class
"""
super().__init__()
# Create a requests session
self._session = self._retry_session()
# Set the base parameters
self._api_key = api_key
self._api_secret = api_secret
self._base_domain = base_domain
self._base_url = f"https://{account}.{self._base_domain}"
self._account = account
self._subaccount = subaccount
self._org_level_access = False
self._access_token = api_token
# Get an access token
self._check_access_token()
def _retry_session(
self,
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 503, 504),
allowed_methods=None,
):
"""
A method to set up automatic retries on HTTP requests that fail.
"""
# Create a new requests session
session = requests.Session()
# Establish the retry criteria
retry_strategy = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
allowed_methods=allowed_methods,
raise_on_status=False,
)
# Build the adapter with the retry criteria
adapter = HTTPAdapter(max_retries=retry_strategy)
# Bind the adapter to HTTP/HTTPS calls
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def _check_access_token(self):
"""
A method to check the validity of the access token.
"""
if self._access_token and self._access_token_expiry is None:
# This catches the case that the user has provided an access token instead of
# key and secret. We cannot know the expiry date so we simply return
return
elif self._access_token is None or self._access_token_expiry < datetime.now(
timezone.utc
):
response = self._get_access_token()
# Parse and restructure the returned date (necessary for Python 3.6)
expiry_date = response.json()["expiresAt"].replace("Z", "+0000")
# Update the access token and expiration
self._access_token_expiry = datetime.strptime(
expiry_date, "%Y-%m-%dT%H:%M:%S.%f%z"
)
self._access_token = response.json()["token"]
def _check_response_code(self, response, expected_response_codes):
"""
Check the requests.response.status_code to make sure it's one that we expected.
"""
if response.status_code in expected_response_codes:
pass
elif response.status_code == RATE_LIMIT_RESPONSE_CODE:
raise RateLimitError(response)
else:
raise ApiError(response)
def _print_debug_response(self, response):
"""
Print the debug logging, based on the returned content type.
"""
logger.debug(response.headers)
# If it's supposed to be a JSON response, parse and log, otherwise, log the raw text
if "application/json" in response.headers.get("Content-Type", "").lower():
try:
if response.status_code != 204:
logger.debug(json.dumps(response.json(), indent=2))
else:
logger.debug("204 No Content Returned")
except ValueError:
logger.warning("Error parsing JSON response body")
else:
logger.debug(response.text)
def _get_access_token(self):
"""
A method to fetch a new access token from Lacework.
Returns:
str: a Lacework API access token
"""
logger.info("Creating Access Token in Lacework...")
uri = f"{self._base_url}/api/v2/access/tokens"
# Build the access token request headers
headers = {
"X-LW-UAKS": self._api_secret,
"Content-Type": "application/json",
"User-Agent": f"laceworksdk-python-client/{version('laceworksdk')}",
}
# Build the access token request data
data = {"keyId": self._api_key, "expiryTime": DEFAULT_ACCESS_TOKEN_EXPIRATION}
response = None
try:
response = self._session.post(uri, json=data, headers=headers)
# Validate the response
self._check_response_code(response, DEFAULT_SUCCESS_RESPONSE_CODES)
self._print_debug_response(response)
except Exception:
if response:
raise ApiError(response)
logger.error("Call to _get_access_token() returned no response.")
raise
return response
def _get_request_headers(self, org_access=False):
"""
A method to build the HTTP request headers for Lacework.
Args:
org_access (bool): Whether the request should be performed at the Organization level
"""
# Build the request headers
headers = self._session.headers
headers["Authorization"] = f"Bearer {self._access_token}"
headers["Org-Access"] = (
"true" if self._org_level_access or org_access else "false"
)
headers["User-Agent"] = f"laceworksdk-python-client/{version('laceworksdk')}"
if self._subaccount:
headers["Account-Name"] = self._subaccount
logger.debug("Request headers: \n" + json.dumps(dict(headers), indent=2))
return headers
def _request(self, method, uri, **kwargs):
"""
A method to abstract building requests to Lacework.
Args:
method (str): The HTTP request method ("GET", "POST", ...)
uri (str): The URI of the API endpoint
kwargs (Any): passed on to the requests package
Returns:
requests.models.Response: a Requests response object
Raises:
ApiError if anything but expected response code is returned
"""
self._check_access_token()
# Strip the protocol/host if provided
domain_begin = uri.find(self._base_domain)
if domain_begin >= 0:
domain_end = domain_begin + len(self._base_domain)
uri = uri[domain_end:]
uri = f"{self._base_url}{uri}"
logger.info(f"{method} request to URI: {uri}")
if "/api/v2/TeamUsers" in uri:
logger.warning(
"TeamUsers APIs is currently experimental and subject to change"
)
if "/api/v2/UserGroups" in uri:
logger.warning(
"UserGroups API is currently experimental and subject to change"
)
# Check for 'org' - if True, make an organization-level API call
# TODO: Remove this on v1.0 release - this is done for back compat
org = kwargs.pop("org", None)
headers = self._get_request_headers(org_access=org)
# Check for 'data' or 'json'
data = kwargs.get("data", "")
json = kwargs.get("json", "")
if data or json:
logger.debug(f"{method} request data:\nData: {data}\nJSON: {json}")
# TODO: Remove this on v1.0 release - this is done for back compat
if data and not json:
kwargs["json"] = data
kwargs.pop("data")
# Make the HTTP request to the API endpoint
response = self._session.request(method, uri, headers=headers, **kwargs)
# Validate the response
self._check_response_code(response, DEFAULT_SUCCESS_RESPONSE_CODES)
self._print_debug_response(response)
# Fix for when Lacework returns a 204 with no data on searches
if method != "DELETE" and response.status_code == 204:
try:
response.json()
except Exception:
response._content = b'{"data": []}'
return response
@property
def account(self):
"""
Returns the current account for the session.
"""
return self._account
@property
def subaccount(self):
"""
Returns the current subaccount for the session.
"""
return self._subaccount
@subaccount.setter
def subaccount(self, subaccount):
"""
Modifies the value of the sessions's subaccount.
"""
self._subaccount = subaccount
[docs]
def get(self, uri, params=None, **kwargs):
"""
A method to build a GET request to interact with Lacework.
Args:
uri (str): uri to send the HTTP GET request to
params (dict): parameters for the HTTP request
kwargs (Any): passed on to the requests package
Returns:
requests.models.Response: a Requests response object
Raises:
ApiError if anything but expected response code is returned
"""
# Perform a GET request
response = self._request("GET", uri, params=params, **kwargs)
return response
[docs]
def get_pages(self, uri, params=None, **kwargs):
"""
A method to build a GET request that yields pages of data returned by Lacework.
Args:
uri (str): uri to send the HTTP GET request to
params (dict): parameters for the HTTP request
kwargs (Any): passed on to the requests package
Yields:
Generator: a generator that yields pages of data
Raises:
ApiError if anything but expected response code is returned
"""
response = self.get(uri, params=params, **kwargs)
while True:
yield response
try:
response_json = response.json()
next_page = (
response_json.get("paging", {}).get("urls", {}).get("nextPage")
)
except json.JSONDecodeError:
logger.error(
"Failed to decode response from Lacework as JSON.", exc_info=True
)
logger.debug(f"Response text: {response.text}")
next_page = None
if next_page:
response = self.get(next_page, params=params, **kwargs)
else:
break
[docs]
def get_data_items(self, uri, params=None, **kwargs):
"""
A method to build a GET request that yields individual objects as returned by Lacework.
Args:
uri (str): uri to send the HTTP GET request to
params (dict): parameters for the HTTP request
kwargs (Any): passed on to the requests package
Yields:
Generator: a generator that yields pages of data
Raises:
ApiError if anything but expected response code is returned
MalformedResponse if the returned response does not contain a top-level dictionary with an "data" key.
"""
# Get generator for pages of JSON data
pages = self.get_pages(uri, params=params, **kwargs)
for page in pages:
page = page.json()
assert isinstance(page, dict)
items = page.get("data")
if items is None:
error_message = f"'data' key not found in JSON data:\n{page}"
raise MalformedResponse(error_message)
for item in items:
yield item
[docs]
def patch(self, uri, data=None, json=None, **kwargs):
"""
A method to build a PATCH request to interact with Lacework.
Args:
uri (str): uri to send the HTTP POST request to
data (Any) : data to be sent in the body of the request
json (dict): data to be sent in JSON format in the body of the request
kwargs (Any): passed on to the requests package
Returns:
requests.models.Response: a Requests response object
Raises:
ApiError if anything but expected response code is returned
"""
# Perform a PATCH request
response = self._request("PATCH", uri, data=data, json=json, **kwargs)
return response
[docs]
def post(self, uri, data=None, json=None, **kwargs):
"""
A method to build a POST request to interact with Lacework.
Args:
uri (str): uri to send the HTTP POST request to
data (Any) : data to be sent in the body of the request
json (dict): data to be sent in JSON format in the body of the request
kwargs (Any): passed on to the requests package
Returns:
requests.models.Response: a Requests response object
Raises:
ApiError if anything but expected response code is returned
"""
# Perform a POST request
response = self._request("POST", uri, data=data, json=json, **kwargs)
return response
[docs]
def put(self, uri, data=None, json=None, **kwargs):
"""
A method to build a PUT request to interact with Lacework.
Args:
uri (str): uri to send the HTTP POST request to
data (Any) : data to be sent in the body of the request
json (dict): data to be sent in JSON format in the body of the request
kwargs (Any): passed on to the requests package
Returns:
requests.models.Response: a Requests response object
Raises:
ApiError if anything but expected response code is returned
"""
# Perform a PUT request
response = self._request("PUT", uri, data=data, json=json, **kwargs)
return response
[docs]
def delete(self, uri, data=None, json=None, **kwargs):
"""A method to build a DELETE request to interact with Lacework.
Args:
uri (str): uri to send the HTTP POST request to
data (Any) : data to be sent in the body of the request
json (dict): data to be sent in JSON format in the body of the request
kwargs (Any): passed on to the requests package
Returns:
requests.models.Response: a Requests response object
Raises:
ApiError if anything but expected response code is returned
"""
# Perform a DELETE request
response = self._request("DELETE", uri, data=data, json=json, **kwargs)
return response