Source code for laceworksdk.api.crud_endpoint

# -*- coding: utf-8 -*-
"""Lacework API wrapper."""
from laceworksdk.api.base_endpoint import BaseEndpoint


[docs] class CrudEndpoint(BaseEndpoint): """A class used to implement CRUD create/read/update/delete functionality for Lacework API Endpoints.""" def __init__(self, session, object_type, endpoint_root="/api/v2"): """ Initialize the CRUDEndpoint Class. Args: session (HttpSession): An instance of the HttpSession class. object_type (str): The Lacework object type to use. endpoint_root (str, optional): The URL endpoint root to use. """ super().__init__(session, object_type, endpoint_root)
[docs] def create(self, params=None, **request_params): """A method to create a new object. Args: params (any): Parameters request_params (any): Request parameters. Returns: dict: JSON containing the new object info """ json = self._build_dict_from_items(request_params) response = self._session.post(self._build_url(), json=json, params=params) return response.json()
[docs] def get(self, id=None, resource=None, **request_params): """A method to get objects. Args: id (str): A string representing the object ID. resource (str): The Lacework API resource type to get. request_params (any): A dictionary of parameters to add to the request. Returns: dict: JSON containing the retrieved object(s) """ params = self._build_dict_from_items(request_params) response = self._session.get( self._build_url(id=id, resource=resource), params=params ) return response.json()
[docs] def search(self, json=None): """A method to search objects. See the API documentation for this API endpoint for valid fields to search against. NOTE: While the "value" and "values" fields are marked as "optional" you must use one of them, depending on the operation you are using. Args: json (dict): The desired search parameters: \n - timeFilter (dict, optional): A dict containing the time frame for the search:\n - startTime (str): The start time for the search - endTime (str): The end time for the search - filters (list of dict, optional): Filters based on field contents:\n - field (str): The name of the data field to which the condition applies\n - expression (str): The comparison operator for the filter condition. Valid values are:\n "eq", "ne", "in", "not_in", "like", "ilike", "not_like", "not_ilike", "not_rlike", "rlike", "gt", "ge", \ "lt", "le", "between"\n - value (str, optional): The value that the condition checks for in the specified field. Use this attribute \ when using an operator that requires a single value. - values (list of str, optional): The values that the condition checks for in the specified field. Use this \ attribute when using an operator that requires multiple values. - returns (list of str, optional): The fields to return Returns: dict: returns a dict containing the search results """ response = self._session.post(self._build_url(action="search"), json=json) return response.json()
[docs] def update(self, id=None, params=None, **request_params): """A method to update an object. Args: id (str): A string representing the object ID. params (any): parameters request_params (dict): Use to pass any additional parameters the API Returns: dict: JSON containing the updated object info """ json = self._build_dict_from_items(request_params) response = self._session.patch(self._build_url(id=id), json=json, params=params) return response.json()
[docs] def delete(self, id, params=None): """A method to delete an object. Args: id (str): A string representing the object GUID. params (any): parameters Returns: requests.models.Response: a Requests response object containing the response code """ response = self._session.delete(self._build_url(id=id), params=params) return response
def _format_filters(self, filters): """A method to properly format the filters object. Args: filters (dict): A dict of filters to be properly formatted (boolean conversion). Returns: dict: formatted filters object """ if "enabled" in filters.keys(): filters["enabled"] = int(bool(filters["enabled"])) return filters