From 42cac2fd18582ba8581f30507c5f0010e7503f0f Mon Sep 17 00:00:00 2001 From: RomanSemkin Date: Thu, 5 Mar 2026 11:07:26 +0200 Subject: [PATCH] Privx 42.0.1 (#125) *accept and handle booleans for url params * refactor response core: explicit headers and predictable payload helpers * refactor api modules: use explicit response builders for all endpoints * test: strengthen response tests with failure-path and edge-case coverage --- privx_api/api_proxy.py | 42 ++++---- privx_api/auth.py | 28 +++--- privx_api/authorizer.py | 89 ++++++++--------- privx_api/base.py | 74 +++++++++++++- privx_api/connection_manager.py | 73 +++++++------- privx_api/db_proxy.py | 4 +- privx_api/host_store.py | 46 ++++----- privx_api/license_manager.py | 18 ++-- privx_api/monitor_service.py | 16 +-- privx_api/network_access_manager.py | 18 ++-- privx_api/response.py | 138 +++++++++++++++++--------- privx_api/role_store.py | 112 ++++++++++----------- privx_api/secrets_manager.py | 76 +++++++------- privx_api/settings_service.py | 16 +-- privx_api/tests/test_api.py | 108 ++++++++++++++++++++ privx_api/tests/test_response.py | 148 ++++++++++++++++++++++++++++ privx_api/trail_index.py | 10 +- privx_api/user_store.py | 38 +++---- privx_api/vault.py | 30 +++--- privx_api/workflow_engine.py | 32 +++--- setup.py | 2 +- 21 files changed, 738 insertions(+), 380 deletions(-) create mode 100644 privx_api/tests/test_response.py diff --git a/privx_api/api_proxy.py b/privx_api/api_proxy.py index e7d4caf..7959f8e 100644 --- a/privx_api/api_proxy.py +++ b/privx_api/api_proxy.py @@ -16,7 +16,7 @@ def get_api_proxy_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.API_PROXY.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_api_proxy_configuration(self) -> PrivXAPIResponse: """ @@ -26,7 +26,7 @@ def get_api_proxy_configuration(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.API_PROXY.CONFIGURATION) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_api_target(self, api_target: dict) -> PrivXAPIResponse: """ @@ -39,7 +39,7 @@ def create_api_target(self, api_target: dict) -> PrivXAPIResponse: UrlEnum.API_PROXY.API_TARGETS, body=api_target, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_api_targets( self, @@ -66,7 +66,7 @@ def get_api_targets( UrlEnum.API_PROXY.API_TARGETS, query_params=query_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_api_target(self, api_target_id: str) -> PrivXAPIResponse: """ @@ -79,7 +79,7 @@ def get_api_target(self, api_target_id: str) -> PrivXAPIResponse: UrlEnum.API_PROXY.API_TARGET, path_params={"api_target_id": api_target_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_api_target( self, @@ -97,7 +97,7 @@ def update_api_target( path_params={"api_target_id": api_target_id}, body=api_target, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_api_target(self, api_target_id: str) -> PrivXAPIResponse: """ @@ -110,7 +110,7 @@ def delete_api_target(self, api_target_id: str) -> PrivXAPIResponse: UrlEnum.API_PROXY.API_TARGET, path_params={"api_target_id": api_target_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_api_targets( self, @@ -139,7 +139,7 @@ def search_api_targets( query_params=query_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_api_target_tags( self, @@ -162,7 +162,7 @@ def get_api_target_tags( UrlEnum.API_PROXY.API_TARGET_TAGS, query_params=query_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_current_user_client_credential( self, @@ -178,7 +178,7 @@ def create_current_user_client_credential( UrlEnum.API_PROXY.CURRENT_CLIENT_CREDENTIALS, body=credential, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_current_user_client_credentials( self, @@ -203,7 +203,7 @@ def get_current_user_client_credentials( UrlEnum.API_PROXY.CURRENT_CLIENT_CREDENTIALS, query_params=query_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_current_user_client_credential( self, @@ -219,7 +219,7 @@ def get_current_user_client_credential( UrlEnum.API_PROXY.CURRENT_CLIENT_CREDENTIAL, path_params={"credential_id": credential_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_current_user_client_credential( self, @@ -237,7 +237,7 @@ def update_current_user_client_credential( path_params={"credential_id": credential_id}, body=credential, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_current_user_client_credential( self, @@ -253,7 +253,7 @@ def delete_current_user_client_credential( UrlEnum.API_PROXY.CURRENT_CLIENT_CREDENTIAL, path_params={"credential_id": credential_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_current_user_client_credential_secret( self, @@ -269,7 +269,7 @@ def get_current_user_client_credential_secret( UrlEnum.API_PROXY.CURRENT_CLIENT_CREDENTIAL_SECRET, path_params={"credential_id": credential_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_user_client_credential( self, @@ -287,7 +287,7 @@ def create_user_client_credential( path_params={"user_id": user_id}, body=credential, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_user_client_credentials( self, @@ -314,7 +314,7 @@ def get_user_client_credentials( path_params={"user_id": user_id}, query_params=query_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user_client_credential( self, @@ -331,7 +331,7 @@ def get_user_client_credential( UrlEnum.API_PROXY.USER_CLIENT_CREDENTIAL, path_params={"user_id": user_id, "credential_id": credential_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_user_client_credential( self, @@ -350,7 +350,7 @@ def update_user_client_credential( path_params={"user_id": user_id, "credential_id": credential_id}, body=credential, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_user_client_credential( self, @@ -367,7 +367,7 @@ def delete_user_client_credential( UrlEnum.API_PROXY.USER_CLIENT_CREDENTIAL, path_params={"user_id": user_id, "credential_id": credential_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user_client_credential_secret( self, @@ -384,4 +384,4 @@ def get_user_client_credential_secret( UrlEnum.API_PROXY.USER_CLIENT_CREDENTIAL_SECRET, path_params={"user_id": user_id, "credential_id": credential_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/auth.py b/privx_api/auth.py index 60c5176..89c51a5 100644 --- a/privx_api/auth.py +++ b/privx_api/auth.py @@ -30,7 +30,7 @@ def get_auth_service_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.AUTH.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_idp_client(self, idp_id: str) -> PrivXAPIResponse: """ @@ -43,7 +43,7 @@ def get_idp_client(self, idp_id: str) -> PrivXAPIResponse: UrlEnum.AUTH.IDP_CLIENT, path_params={"idp_id": idp_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_idp_client(self, idp_client: dict) -> PrivXAPIResponse: """ @@ -55,7 +55,7 @@ def create_idp_client(self, idp_client: dict) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.AUTH.IDP_CLIENTS, body=idp_client ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def update_idp_client(self, idp_id: str, idp_client: dict) -> PrivXAPIResponse: """ @@ -67,7 +67,7 @@ def update_idp_client(self, idp_id: str, idp_client: dict) -> PrivXAPIResponse: response_status, data = self._http_put( UrlEnum.AUTH.IDP_CLIENT, path_params={"idp_id": idp_id}, body=idp_client ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_idp_client(self, idp_id: str) -> PrivXAPIResponse: """ @@ -79,7 +79,7 @@ def delete_idp_client(self, idp_id: str) -> PrivXAPIResponse: response_status, data = self._http_delete( UrlEnum.AUTH.IDP_CLIENT, path_params={"idp_id": idp_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def regenerate_idp_client_config(self, idp_id: str) -> PrivXAPIResponse: """ @@ -92,7 +92,7 @@ def regenerate_idp_client_config(self, idp_id: str) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.AUTH.REGENERATE_IDP_CLIENT, path_params={"idp_id": idp_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user_sessions( self, @@ -117,7 +117,7 @@ def get_user_sessions( query_params=search_params, path_params={"user_id": user_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_source_sessions( self, @@ -142,7 +142,7 @@ def get_source_sessions( query_params=search_params, path_params={"source_id": source_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_sessions( self, @@ -167,7 +167,7 @@ def search_sessions( query_params=search_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def terminate_session(self, session_id: str) -> PrivXAPIResponse: """ @@ -180,7 +180,7 @@ def terminate_session(self, session_id: str) -> PrivXAPIResponse: UrlEnum.AUTH.TERMINATE_SESSION, path_params={"session_id": session_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def terminate_user_sessions(self, user_id: str) -> PrivXAPIResponse: """ @@ -193,7 +193,7 @@ def terminate_user_sessions(self, user_id: str) -> PrivXAPIResponse: UrlEnum.AUTH.TERMINATE_USER_SESSIONS, path_params={"user_id": user_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def logout(self) -> PrivXAPIResponse: """ @@ -203,7 +203,7 @@ def logout(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_post(UrlEnum.AUTH.LOGOUT) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user_mobile_devices(self, user_id: str) -> PrivXAPIResponse: """ @@ -216,7 +216,7 @@ def get_user_mobile_devices(self, user_id: str) -> PrivXAPIResponse: UrlEnum.AUTH.MGW_USER_DEVICES, path_params={"user_id": user_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def unpair_user_mobile_device( self, user_id: str, device_id: str @@ -231,4 +231,4 @@ def unpair_user_mobile_device( UrlEnum.AUTH.MGW_USER_DEVICES_UNPAIR, path_params={"user_id": user_id, "device_id": device_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/authorizer.py b/privx_api/authorizer.py index a6e660b..d2c41b4 100644 --- a/privx_api/authorizer.py +++ b/privx_api/authorizer.py @@ -16,7 +16,7 @@ def get_authorizer_service_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.AUTHORIZER.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_authorizer_cert( self, access_group_id: Optional[str] = None @@ -33,7 +33,7 @@ def get_authorizer_cert( {"access_group_id": access_group_id} if access_group_id else None ), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def download_authorizer_cert(self, cert_id: str) -> PrivXStreamResponse: """ @@ -45,7 +45,7 @@ def download_authorizer_cert(self, cert_id: str) -> PrivXStreamResponse: response = self._http_stream( UrlEnum.AUTHORIZER.AUTHORIZER_CERT_ID, path_params={"id": cert_id} ) - return PrivXStreamResponse(response, HTTPStatus.OK) + return self._stream_api_response(response, HTTPStatus.OK) def download_cert_revocation_list(self, cert_id: str) -> PrivXStreamResponse: """ @@ -57,7 +57,7 @@ def download_cert_revocation_list(self, cert_id: str) -> PrivXStreamResponse: response = self._http_stream( UrlEnum.AUTHORIZER.CERT_REVOCATION_LIST, path_params={"id": cert_id} ) - return PrivXStreamResponse(response, HTTPStatus.OK) + return self._stream_api_response(response, HTTPStatus.OK) def get_target_host_credentials( self, @@ -73,7 +73,7 @@ def get_target_host_credentials( UrlEnum.AUTHORIZER.TARGET_HOST, body=target_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_principals(self) -> PrivXAPIResponse: """ @@ -83,7 +83,7 @@ def get_principals(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.AUTHORIZER.PRINCIPALS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_principal( self, @@ -106,7 +106,7 @@ def get_principal( path_params={"group_id": group_id}, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_principal_key( self, group_id: str, key_id: Optional[str] = None @@ -123,7 +123,7 @@ def delete_principal_key( path_params={"group_id": group_id}, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_principal_key( self, @@ -139,7 +139,7 @@ def create_principal_key( UrlEnum.AUTHORIZER.CREATE_GROUP_PRINCIPAL_KEY, path_params={"group_id": group_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def import_principal_key( self, group_id: str, principal_key_params: dict @@ -155,7 +155,7 @@ def import_principal_key( path_params={"group_id": group_id}, body=principal_key_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def sign_with_principal_key( self, @@ -176,12 +176,11 @@ def sign_with_principal_key( query_params=search_params, body=sign_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_component_certs( self, ca_type: str, - access_group_id: Optional[str] = None, ) -> PrivXAPIResponse: """ Gets authorizer's CA certificates. @@ -190,13 +189,11 @@ def get_component_certs( Returns: PrivXAPIResponse """ - search_params = self._get_search_params(access_group_id=access_group_id) response_status, data = self._http_get( UrlEnum.AUTHORIZER.COMPONENT_CERTS, path_params={"ca_type": ca_type}, - query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def download_component_cert( self, ca_type: str, cert_id: str @@ -212,7 +209,7 @@ def download_component_cert( UrlEnum.AUTHORIZER.COMPONENT_CERT, path_params={"id": cert_id, "ca_type": ca_type}, ) - return PrivXStreamResponse(response, HTTPStatus.OK) + return self._stream_api_response(response, HTTPStatus.OK) def download_component_cert_crl( self, @@ -230,7 +227,7 @@ def download_component_cert_crl( UrlEnum.AUTHORIZER.COMPONENT_CERT_REVOCATION_LIST, path_params={"id": cert_id, "ca_type": ca_type}, ) - return PrivXStreamResponse(response, HTTPStatus.OK) + return self._stream_api_response(response, HTTPStatus.OK) def create_extender_config_download_handle( self, @@ -246,7 +243,7 @@ def create_extender_config_download_handle( UrlEnum.AUTHORIZER.EXTENDER_CONFIG_SESSION_ID, path_params={"trusted_client_id": trusted_client_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def download_extender_config( self, @@ -266,7 +263,7 @@ def download_extender_config( "session_id": session_id, }, ) - return PrivXStreamResponse(response, HTTPStatus.OK) + return self._stream_api_response(response, HTTPStatus.OK) def create_deployment_script_download_handle( self, trusted_client_id: str @@ -281,7 +278,7 @@ def create_deployment_script_download_handle( UrlEnum.AUTHORIZER.DEPLOYMENT_SCRIPT_SESSION_ID, path_params={"trusted_client_id": trusted_client_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def download_deployment_script( self, @@ -301,7 +298,7 @@ def download_deployment_script( "session_id": session_id, }, ) - return PrivXStreamResponse(response, HTTPStatus.OK) + return self._stream_api_response(response, HTTPStatus.OK) def download_principal_command_script(self) -> PrivXStreamResponse: """ @@ -313,7 +310,7 @@ def download_principal_command_script(self) -> PrivXStreamResponse: response = self._http_stream( UrlEnum.AUTHORIZER.DOWNLOAD_COMMAND_SCRIPT, ) - return PrivXStreamResponse(response, HTTPStatus.OK) + return self._stream_api_response(response, HTTPStatus.OK) def create_carrier_config_download_handle( self, trusted_client_id: str @@ -328,7 +325,7 @@ def create_carrier_config_download_handle( UrlEnum.AUTHORIZER.CARRIER_CONFIG_SESSION_ID, path_params={"trusted_client_id": trusted_client_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def download_carrier_config( self, @@ -348,7 +345,7 @@ def download_carrier_config( "session_id": session_id, }, ) - return PrivXStreamResponse(response, HTTPStatus.OK) + return self._stream_api_response(response, HTTPStatus.OK) def create_web_proxy_config_download_handle( self, trusted_client_id: str @@ -363,7 +360,7 @@ def create_web_proxy_config_download_handle( UrlEnum.AUTHORIZER.WEB_PROXY_CONFIG_SESSION_ID, path_params={"trusted_client_id": trusted_client_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def download_web_proxy_config( self, @@ -383,7 +380,7 @@ def download_web_proxy_config( "session_id": session_id, }, ) - return PrivXStreamResponse(response, HTTPStatus.OK) + return self._stream_api_response(response, HTTPStatus.OK) def get_cert_auth_templates( self, service: Optional[str] = None @@ -399,7 +396,7 @@ def get_cert_auth_templates( UrlEnum.AUTHORIZER.CERT_AUTH_TEMPLATES, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_ssl_trust_anchor(self) -> PrivXAPIResponse: """ @@ -411,7 +408,7 @@ def get_ssl_trust_anchor(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.AUTHORIZER.SSL_TRUST_ANCHOR, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_extender_trust_anchor(self) -> PrivXAPIResponse: """ @@ -423,7 +420,7 @@ def get_extender_trust_anchor(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.AUTHORIZER.EXTENDER_TRUST_ANCHOR, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_access_groups( self, @@ -448,7 +445,7 @@ def get_access_groups( UrlEnum.AUTHORIZER.ACCESS_GROUPS, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_access_group(self, access_group_params: dict) -> PrivXAPIResponse: """ @@ -461,7 +458,7 @@ def create_access_group(self, access_group_params: dict) -> PrivXAPIResponse: UrlEnum.AUTHORIZER.ACCESS_GROUPS, body=access_group_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def search_access_groups( self, @@ -488,7 +485,7 @@ def search_access_groups( query_params=search_params, body=get_value(access_group_params, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_access_group(self, access_group_id: str) -> PrivXAPIResponse: """ @@ -501,7 +498,7 @@ def get_access_group(self, access_group_id: str) -> PrivXAPIResponse: UrlEnum.AUTHORIZER.ACCESS_GROUP, path_params={"id": access_group_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_access_group( self, access_group_id: str, access_group_params: dict @@ -517,7 +514,7 @@ def update_access_group( path_params={"id": access_group_id}, body=access_group_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_access_group(self, access_group_id: str) -> PrivXAPIResponse: """ @@ -529,7 +526,7 @@ def delete_access_group(self, access_group_id: str) -> PrivXAPIResponse: response_status, data = self._http_delete( UrlEnum.AUTHORIZER.ACCESS_GROUP, path_params={"id": access_group_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_access_group_CA_key( self, @@ -545,7 +542,7 @@ def get_access_group_CA_key( UrlEnum.AUTHORIZER.CREATE_ACCESS_GROUP_CA_KEY, path_params={"id": access_group_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_access_group_CA_key( self, access_group_id: str, ca_id: str @@ -560,7 +557,7 @@ def delete_access_group_CA_key( UrlEnum.AUTHORIZER.DELETE_ACCESS_GROUP_CA_KEY, path_params={"id": access_group_id, "ca_id": ca_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_certificates( self, @@ -587,7 +584,7 @@ def search_certificates( query_params=search_params, body=cert_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_certificates_list(self) -> PrivXAPIResponse: """ @@ -599,7 +596,7 @@ def get_certificates_list(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.AUTHORIZER.GET_CERTIFICATES_LIST, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_certificate_by_id(self, cert_id: str) -> PrivXAPIResponse: """ @@ -612,7 +609,7 @@ def get_certificate_by_id(self, cert_id: str) -> PrivXAPIResponse: UrlEnum.AUTHORIZER.GET_CERT_BY_ID, path_params={"id": cert_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_account_secrets( self, @@ -630,7 +627,7 @@ def get_account_secrets( UrlEnum.AUTHORIZER.ACCOUNT_SECRETS, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_account_secrets( self, @@ -651,7 +648,7 @@ def search_account_secrets( query_params=search_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def checkout_account_secrets( self, @@ -666,7 +663,7 @@ def checkout_account_secrets( response_status, data = self._http_post( UrlEnum.AUTHORIZER.CHECKOUT_ACCOUNT_SECRETS, body=checkout_params ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_account_secret_checkouts( self, @@ -684,7 +681,7 @@ def get_account_secret_checkouts( UrlEnum.AUTHORIZER.CHECKOUT_ACCOUNT_SECRETS, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_account_secret_checkout(self, id: str) -> PrivXAPIResponse: """ @@ -696,7 +693,7 @@ def get_account_secret_checkout(self, id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.AUTHORIZER.CHECKOUT_ACCOUNT_SECRET, path_params={"id": id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def release_account_secret_checkout(self, id: str) -> PrivXAPIResponse: """ @@ -708,4 +705,4 @@ def release_account_secret_checkout(self, id: str) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.AUTHORIZER.RELEASE_ACCOUNT_SECRET, path_params={"id": id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/base.py b/privx_api/base.py index 099c41a..efe07ec 100644 --- a/privx_api/base.py +++ b/privx_api/base.py @@ -12,6 +12,7 @@ from privx_api.cookie_jar import RoutingCookieJar from privx_api.enums import NO_AUTH_STATUS_URLS, UrlEnum from privx_api.exceptions import InternalAPIException +from privx_api.response import PrivXAPIResponse, PrivXStreamResponse def format_path_components(format_str: str, **kw) -> str: @@ -87,6 +88,7 @@ def __init__( self._access_token_age = None self._re_auth_margin = re_auth_margin self._cookie_jar = RoutingCookieJar() if use_cookies else None + self._last_response_headers = {} def _authenticate(self, username: str, password: str) -> None: # saving the creds for the re-auth purposes @@ -187,9 +189,23 @@ def _get_headers(self, url: str) -> dict: headers["Cookie"] = cookie_header return headers - def _get_search_params(self, **kwargs: Union[str, int]) -> dict: - params = {key: val for key, val in kwargs.items() if val is not None} - return params if any(params) else {} + def _get_search_params(self, **kwargs: Union[str, int, bool]) -> dict: + """Normalize query params before encoding into URLs. + + PrivX backends have inconsistent validation and case handling for query + values. To keep requests predictable across services and versions, the + SDK intentionally normalizes non-enum string values to lowercase. + Boolean values are serialized as lowercase "true"/"false". + """ + params = {} + for key, value in kwargs.items(): + if value is None: + continue + if isinstance(value, bool): + value = "true" if value else "false" + + params[key] = value + return params if params else {} def _get_url(self, name: str) -> str: url = UrlEnum.get(name) @@ -197,6 +213,46 @@ def _get_url(self, name: str) -> str: raise InternalAPIException("URL missing: ", name) return url + def _collect_headers(self, response: HTTPResponse) -> dict: + if getattr(response, "headers", None): + headers = dict(response.headers) + elif getattr(response, "msg", None): + headers = dict(response.msg.items()) + else: + headers = dict(response.getheaders()) + return {k.lower(): v for k, v in headers.items()} + + def _store_response_headers(self, headers: dict) -> None: + self._last_response_headers = dict(headers) + + @property + def last_response_headers(self) -> dict: + return dict(self._last_response_headers) + + def _api_response( + self, + response_status: http.HTTPStatus, + expected_status: int, + data: Union[dict, str, list, bytes, None], + ) -> PrivXAPIResponse: + return PrivXAPIResponse( + response_status, + expected_status, + data, + headers=self.last_response_headers, + ) + + def _stream_api_response( + self, + response: HTTPResponse, + expected_status: int, + ) -> PrivXStreamResponse: + return PrivXStreamResponse( + response, + expected_status, + headers=self.last_response_headers, + ) + def _http_get( self, url_name: str, @@ -216,6 +272,8 @@ def _http_get( except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + headers = self._collect_headers(response) + self._store_response_headers(headers) self._store_response_cookies(response, request["url"]) return response.status, response.read() @@ -230,6 +288,8 @@ def _http_get_no_auth(self, url_name: str) -> Tuple: except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + headers = self._collect_headers(response) + self._store_response_headers(headers) self._store_response_cookies(response, request["url"]) return response.status, response.read() @@ -254,6 +314,8 @@ def _http_post( except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + headers = self._collect_headers(response) + self._store_response_headers(headers) self._store_response_cookies(response, request["url"]) return response.status, response.read() @@ -278,6 +340,8 @@ def _http_put( except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + headers = self._collect_headers(response) + self._store_response_headers(headers) self._store_response_cookies(response, request["url"]) return response.status, response.read() @@ -302,6 +366,8 @@ def _http_delete( except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + headers = self._collect_headers(response) + self._store_response_headers(headers) self._store_response_cookies(response, request["url"]) return response.status, response.read() @@ -325,6 +391,8 @@ def _http_stream( except (OSError, HTTPException) as e: raise InternalAPIException(e) response = conn.getresponse() + headers = self._collect_headers(response) + self._store_response_headers(headers) self._store_response_cookies(response, request["url"]) return response diff --git a/privx_api/connection_manager.py b/privx_api/connection_manager.py index b9ea1e8..f6dd02e 100644 --- a/privx_api/connection_manager.py +++ b/privx_api/connection_manager.py @@ -16,7 +16,7 @@ def get_connection_manager_service_status(self): PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.CONNECTION_MANAGER.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_connections( self, @@ -43,7 +43,7 @@ def get_connections( UrlEnum.CONNECTION_MANAGER.CONNECTIONS, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_connections( self, @@ -73,7 +73,7 @@ def search_connections( query_params=search_params, body=get_value(connection_params, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_connection(self, connection_id: str) -> PrivXAPIResponse: """ @@ -86,7 +86,7 @@ def get_connection(self, connection_id: str) -> PrivXAPIResponse: UrlEnum.CONNECTION_MANAGER.CONNECTION, path_params={"connection_id": connection_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_connection_tags( self, @@ -112,7 +112,7 @@ def get_connection_tags( UrlEnum.CONNECTION_MANAGER.CONNECTION_TAGS, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_connection_tags( self, @@ -130,7 +130,7 @@ def update_connection_tags( path_params={"connection_id": connection_id}, body=get_value(connection_tags, []), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_trail_download_handle( self, connection_id: str, channel_id: str, file_id: str @@ -149,7 +149,7 @@ def create_trail_download_handle( "file_id": file_id, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def download_trail( self, @@ -175,7 +175,7 @@ def download_trail( "session_id": session_id, }, ) - return PrivXStreamResponse(response_obj, HTTPStatus.OK) + return self._stream_api_response(response_obj, HTTPStatus.OK) def create_trail_log_download_handle( self, connection_id: str, channel_id: str @@ -193,7 +193,7 @@ def create_trail_log_download_handle( "channel_id": channel_id, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def download_trail_log( self, @@ -223,7 +223,7 @@ def download_trail_log( }, query_params=search_params, ) - return PrivXStreamResponse(response_obj, HTTPStatus.OK) + return self._stream_api_response(response_obj, HTTPStatus.OK) def get_connection_access_roles(self, connection_id: str) -> PrivXAPIResponse: """ @@ -238,7 +238,7 @@ def get_connection_access_roles(self, connection_id: str) -> PrivXAPIResponse: "connection_id": connection_id, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def grant_access_role_to_connection( self, @@ -258,7 +258,7 @@ def grant_access_role_to_connection( "role_id": role_id, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def revoke_access_role_from_connection( self, @@ -278,7 +278,7 @@ def revoke_access_role_from_connection( "role_id": role_id, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def revoke_role_permissions_from_connections( self, @@ -296,7 +296,7 @@ def revoke_role_permissions_from_connections( "role_id": role_id, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def terminate_connection( self, @@ -316,7 +316,7 @@ def terminate_connection( }, body=termination_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def terminate_connection_by_host( self, @@ -336,7 +336,7 @@ def terminate_connection_by_host( }, body=termination_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def terminate_connection_by_user( self, @@ -356,7 +356,7 @@ def terminate_connection_by_user( }, body=termination_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) # UEBA Management @@ -370,7 +370,7 @@ def get_ueba_configurations(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.CONNECTION_MANAGER.UEBA_CONFIGURATIONS ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def set_ueba_configurations(self, payload) -> PrivXAPIResponse: """ @@ -383,7 +383,7 @@ def set_ueba_configurations(self, payload) -> PrivXAPIResponse: UrlEnum.CONNECTION_MANAGER.UEBA_CONFIGURATIONS, body=get_value(payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_ueba_anomaly_settings(self) -> PrivXAPIResponse: """ @@ -395,7 +395,7 @@ def get_ueba_anomaly_settings(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.CONNECTION_MANAGER.UEBA_ANOMALY_SETTINGS ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def set_ueba_anomaly_settings(self, payload) -> PrivXAPIResponse: """ @@ -408,7 +408,7 @@ def set_ueba_anomaly_settings(self, payload) -> PrivXAPIResponse: UrlEnum.CONNECTION_MANAGER.UEBA_ANOMALY_SETTINGS, body=get_value(payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def start_ueba_analysis(self, dataset_id: str) -> PrivXAPIResponse: """ @@ -423,7 +423,7 @@ def start_ueba_analysis(self, dataset_id: str) -> PrivXAPIResponse: "dataset_id": dataset_id, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def stop_ueba_analysis(self) -> PrivXAPIResponse: """ @@ -435,7 +435,7 @@ def stop_ueba_analysis(self) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.CONNECTION_MANAGER.STOP_ANALYSIS ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_id_for_ueba_script(self) -> PrivXAPIResponse: """ @@ -447,7 +447,7 @@ def create_id_for_ueba_script(self) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.CONNECTION_MANAGER.UEBA_SETUP_SCRIPT ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def download_ueba_script(self, session_id: str) -> PrivXAPIResponse: """ @@ -460,7 +460,7 @@ def download_ueba_script(self, session_id: str) -> PrivXAPIResponse: UrlEnum.CONNECTION_MANAGER.DOWNLOAD_SCRIPT, path_params={"session_id": session_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_ueba_datasets(self) -> PrivXAPIResponse: """ @@ -470,7 +470,7 @@ def get_ueba_datasets(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.CONNECTION_MANAGER.UEBA_DATASETS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def save_ueba_datasets(self, dataset_definition) -> PrivXAPIResponse: """ @@ -483,10 +483,11 @@ def save_ueba_datasets(self, dataset_definition) -> PrivXAPIResponse: UrlEnum.CONNECTION_MANAGER.UEBA_DATASETS, body=get_value(dataset_definition, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_ueba_dataset( - self, dataset_id: str, logs: bool, bin_count: int + self, + dataset_id: str, ) -> PrivXAPIResponse: """ Get UEBA dataset by id @@ -494,13 +495,11 @@ def get_ueba_dataset( Returns: PrivXAPIResponse """ - search_params = self._get_search_params(logs=logs, bin_count=bin_count) response_status, data = self._http_get( UrlEnum.CONNECTION_MANAGER.UEBA_DATASET, path_params={"dataset_id": dataset_id}, - query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_ueba_dataset( self, dataset_id: str, dataset_definition @@ -516,7 +515,7 @@ def update_ueba_dataset( path_params={"dataset_id": dataset_id}, body=get_value(dataset_definition, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_ueba_dataset(self, dataset_id: str) -> PrivXAPIResponse: """ @@ -529,7 +528,7 @@ def delete_ueba_dataset(self, dataset_id: str) -> PrivXAPIResponse: UrlEnum.CONNECTION_MANAGER.UEBA_DATASET, path_params={"dataset_id": dataset_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def train_ueba_dataset( self, dataset_id: str, set_active_after_training: bool @@ -548,7 +547,7 @@ def train_ueba_dataset( path_params={"dataset_id": dataset_id}, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_ueba_connection_count(self, payload) -> PrivXAPIResponse: """ @@ -561,7 +560,7 @@ def get_ueba_connection_count(self, payload) -> PrivXAPIResponse: UrlEnum.CONNECTION_MANAGER.UEBA_CONNECTION_COUNT, body=get_value(payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_ueba_status(self) -> PrivXAPIResponse: """ @@ -571,7 +570,7 @@ def get_ueba_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.CONNECTION_MANAGER.UEBA_STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_ueba_internal_status(self) -> PrivXAPIResponse: """ @@ -583,4 +582,4 @@ def get_ueba_internal_status(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.CONNECTION_MANAGER.UEBA_INTERNAL_STATUS ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/db_proxy.py b/privx_api/db_proxy.py index 2495c5e..29fceaf 100644 --- a/privx_api/db_proxy.py +++ b/privx_api/db_proxy.py @@ -18,7 +18,7 @@ def get_db_proxy_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.DB_PROXY.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_db_proxy_conf(self) -> PrivXAPIResponse: """ @@ -28,4 +28,4 @@ def get_db_proxy_conf(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.DB_PROXY.CONF) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/host_store.py b/privx_api/host_store.py index 0fde883..79d96f0 100644 --- a/privx_api/host_store.py +++ b/privx_api/host_store.py @@ -16,7 +16,7 @@ def get_host_store_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.HOST_STORE.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_hosts( self, @@ -46,7 +46,7 @@ def search_hosts( query_params=search_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_hosts( self, @@ -73,7 +73,7 @@ def get_hosts( response_status, data = self._http_get( UrlEnum.HOST_STORE.HOSTS, query_params=search_params ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_host(self, host: dict) -> PrivXAPIResponse: """ @@ -83,7 +83,7 @@ def create_host(self, host: dict) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_post(UrlEnum.HOST_STORE.HOSTS, body=host) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def resolve_host(self, host_resolve_params: dict) -> PrivXAPIResponse: """ @@ -96,7 +96,7 @@ def resolve_host(self, host_resolve_params: dict) -> PrivXAPIResponse: UrlEnum.HOST_STORE.RESOLVE, body=host_resolve_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_host(self, host_id: str) -> PrivXAPIResponse: """ @@ -109,7 +109,7 @@ def get_host(self, host_id: str) -> PrivXAPIResponse: UrlEnum.HOST_STORE.HOST, path_params={"host_id": host_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_host(self, host_id: str, host: dict) -> PrivXAPIResponse: """ @@ -121,7 +121,7 @@ def update_host(self, host_id: str, host: dict) -> PrivXAPIResponse: response_status, data = self._http_put( UrlEnum.HOST_STORE.HOST, path_params={"host_id": host_id}, body=host ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_host(self, host_id: str) -> PrivXAPIResponse: """ @@ -133,7 +133,7 @@ def delete_host(self, host_id: str) -> PrivXAPIResponse: response_status, data = self._http_delete( UrlEnum.HOST_STORE.HOST, path_params={"host_id": host_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def set_host_deployable( self, host_id: str, deployable_params: dict @@ -149,7 +149,7 @@ def set_host_deployable( path_params={"host_id": host_id}, body=deployable_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_host_tags( self, @@ -174,7 +174,7 @@ def get_host_tags( UrlEnum.HOST_STORE.TAGS, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def set_host_disabled_status( self, host_id: str, host_params: dict @@ -190,7 +190,7 @@ def set_host_disabled_status( path_params={"host_id": host_id}, body=host_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def resolve_host_realm(self, realm_params: dict) -> PrivXAPIResponse: """ @@ -204,7 +204,7 @@ def resolve_host_realm(self, realm_params: dict) -> PrivXAPIResponse: UrlEnum.HOST_STORE.REALM, body=realm_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_default_service_options(self) -> PrivXAPIResponse: """ @@ -216,7 +216,7 @@ def get_default_service_options(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.HOST_STORE.SETTINGS, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_command_restriction_whitelists( self, @@ -241,7 +241,7 @@ def get_command_restriction_whitelists( UrlEnum.HOST_STORE.WHITELISTS, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_command_restriction_whitelist(self, whitelist: dict) -> PrivXAPIResponse: """ @@ -254,7 +254,7 @@ def create_command_restriction_whitelist(self, whitelist: dict) -> PrivXAPIRespo UrlEnum.HOST_STORE.WHITELISTS, body=whitelist, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_command_restriction_whitelist(self, whitelist_id: str) -> PrivXAPIResponse: """ @@ -267,7 +267,7 @@ def get_command_restriction_whitelist(self, whitelist_id: str) -> PrivXAPIRespon UrlEnum.HOST_STORE.WHITELIST, path_params={"whitelist_id": whitelist_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_command_restriction_whitelist( self, whitelist_id: str @@ -282,7 +282,7 @@ def delete_command_restriction_whitelist( UrlEnum.HOST_STORE.WHITELIST, path_params={"whitelist_id": whitelist_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_command_restriction_whitelist( self, whitelist_id: str, whitelist: dict @@ -298,7 +298,7 @@ def update_command_restriction_whitelist( path_params={"whitelist_id": whitelist_id}, body=whitelist, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_command_restriction_whitelists( self, @@ -330,7 +330,7 @@ def search_command_restriction_whitelists( query_params=search_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def eval_commands_against_whitelist( self, whitelist: dict, rshell_variant: str, cmds: [str] @@ -349,7 +349,7 @@ def eval_commands_against_whitelist( "commands": cmds, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_session_host_certificates( self, @@ -378,7 +378,7 @@ def get_session_host_certificates( path_params={"host_id": host_id}, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_session_host_certificates(self, host_id: str) -> PrivXAPIResponse: """ @@ -392,7 +392,7 @@ def delete_session_host_certificates(self, host_id: str) -> PrivXAPIResponse: UrlEnum.HOST_STORE.SESSION_HOST_CERTS, path_params={"host_id": host_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_session_host_certificate( self, @@ -410,4 +410,4 @@ def delete_session_host_certificate( UrlEnum.HOST_STORE.SESSION_HOST_CERT, query_params={"host_id": host_id, "certificate_id": certificate_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/license_manager.py b/privx_api/license_manager.py index 59e6b33..705de34 100644 --- a/privx_api/license_manager.py +++ b/privx_api/license_manager.py @@ -14,7 +14,7 @@ def get_license_manager_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.LICENSE.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_license(self) -> PrivXAPIResponse: """ @@ -24,7 +24,7 @@ def get_license(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.LICENSE.LICENSE) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def set_license(self, license_code: str) -> PrivXAPIResponse: """ @@ -37,7 +37,7 @@ def set_license(self, license_code: str) -> PrivXAPIResponse: UrlEnum.LICENSE.LICENSE, body=license_code, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def refresh_license(self) -> PrivXAPIResponse: """ @@ -49,7 +49,7 @@ def refresh_license(self) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.LICENSE.REFRESH, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def set_license_analytics(self, license_params: dict) -> PrivXAPIResponse: """ @@ -62,7 +62,7 @@ def set_license_analytics(self, license_params: dict) -> PrivXAPIResponse: UrlEnum.LICENSE.OPT_IN, body=license_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def deactivate_license(self) -> PrivXAPIResponse: """ @@ -74,7 +74,7 @@ def deactivate_license(self) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.LICENSE.DEACTIVATE, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_mobilegw_registration_status(self) -> PrivXAPIResponse: """ @@ -84,7 +84,7 @@ def get_mobilegw_registration_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.LICENSE.MGW_STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def register_privx_to_mobilegw(self) -> PrivXAPIResponse: """ @@ -94,7 +94,7 @@ def register_privx_to_mobilegw(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status = self._http_post(UrlEnum.LICENSE.MGW_REGISTER) - return PrivXAPIResponse(response_status, HTTPStatus.OK, "") + return self._api_response(response_status, HTTPStatus.OK, "") def unregister_privx_from_mobilegw(self) -> PrivXAPIResponse: """ @@ -104,4 +104,4 @@ def unregister_privx_from_mobilegw(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status = self._http_post(UrlEnum.LICENSE.MGW_UNREGISTER) - return PrivXAPIResponse(response_status, HTTPStatus.OK, "") + return self._api_response(response_status, HTTPStatus.OK, "") diff --git a/privx_api/monitor_service.py b/privx_api/monitor_service.py index 419a3ad..8a84dd7 100644 --- a/privx_api/monitor_service.py +++ b/privx_api/monitor_service.py @@ -15,7 +15,7 @@ def get_monitor_service_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.MONITOR.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_privx_components_status(self) -> PrivXAPIResponse: """ @@ -25,7 +25,7 @@ def get_privx_components_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.MONITOR.COMPONENTS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_privx_component_status(self, hostname: str) -> PrivXAPIResponse: """ @@ -37,7 +37,7 @@ def get_privx_component_status(self, hostname: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.MONITOR.COMPONENT, path_params={"hostname": hostname} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_audit_events( self, @@ -71,7 +71,7 @@ def search_audit_events( query_params=search_params, body=audit_event_params if audit_event_params else {}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_audit_events( self, @@ -99,7 +99,7 @@ def get_audit_events( response_status, data = self._http_get( UrlEnum.MONITOR.AUDIT_EVENTS, query_params=search_params ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_audit_event_codes(self) -> PrivXAPIResponse: """ @@ -109,7 +109,7 @@ def get_audit_event_codes(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.MONITOR.AUDIT_EVENT_CODES) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_instance_status(self) -> PrivXAPIResponse: """ @@ -119,7 +119,7 @@ def get_instance_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.MONITOR.INSTANCE_STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def terminate_privx_instances(self) -> PrivXAPIResponse: """ @@ -129,4 +129,4 @@ def terminate_privx_instances(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_post(UrlEnum.MONITOR.TERMINATE_INSTANCES) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/network_access_manager.py b/privx_api/network_access_manager.py index ed0269b..7cd79f8 100644 --- a/privx_api/network_access_manager.py +++ b/privx_api/network_access_manager.py @@ -16,7 +16,7 @@ def get_network_manager_service_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.NETWORK_ACCESS_MANAGER.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_network_target( self, @@ -26,7 +26,7 @@ def get_network_target( UrlEnum.NETWORK_ACCESS_MANAGER.NETWORK_TARGET, path_params={"network_target_id": network_target_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_network_targets( self, @@ -34,27 +34,25 @@ def get_network_targets( limit: Optional[int] = None, sort_key: Optional[str] = None, sort_dir: Optional[str] = None, - filter_param: Optional[str] = None, ): get_params = self._get_search_params( offset=offset, limit=limit, sortkey=sort_key, sortdir=sort_dir, - filter=filter_param, ) response_status, data = self._http_get( UrlEnum.NETWORK_ACCESS_MANAGER.NETWORK_TARGETS, query_params=get_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_network_target(self, network_target: dict): response_status, data = self._http_post( UrlEnum.NETWORK_ACCESS_MANAGER.NETWORK_TARGETS, body=network_target, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def update_network_target(self, network_target_id: str, network_target: dict): response_status, data = self._http_put( @@ -62,7 +60,7 @@ def update_network_target(self, network_target_id: str, network_target: dict): path_params={"network_target_id": network_target_id}, body=network_target, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def disable_network_target(self, network_target_id: str, disabled: dict): response_status, data = self._http_put( @@ -70,14 +68,14 @@ def disable_network_target(self, network_target_id: str, disabled: dict): path_params={"network_target_id": network_target_id}, body=disabled, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_network_target(self, network_target_id: str): response_status, data = self._http_delete( UrlEnum.NETWORK_ACCESS_MANAGER.NETWORK_TARGET, path_params={"network_target_id": network_target_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_network_targets( self, @@ -107,4 +105,4 @@ def search_network_targets( query_params=search_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/response.py b/privx_api/response.py index ac6f344..a529108 100644 --- a/privx_api/response.py +++ b/privx_api/response.py @@ -1,103 +1,143 @@ import http.client import json -from typing import NoReturn +from typing import Any, Generator, NoReturn, Optional, Union from privx_api.exceptions import InternalAPIException class BaseResponse: - def __init__(self) -> None: - self._ok = None - self._data = None + """Common response metadata shared by buffered and streamed responses.""" + + def __init__( + self, + status: int, + ok: bool, + headers: Optional[dict] = None, + ) -> None: + """Store HTTP status code, success flag, and response headers.""" + self._status = status + self._ok = ok + self._headers = self._normalize_headers(headers or {}) @property def ok(self) -> bool: - """ - Returns: - Boolean wheter the call was successful or not - """ + """True when response status matches the expected status.""" return self._ok + @property + def status(self) -> int: + """Actual HTTP status code returned by the server.""" + return self._status + + @property + def headers(self) -> dict: + """HTTP response headers as a normalized lowercase-key mapping.""" + return dict(self._headers) + @property def data(self) -> NoReturn: + """Response payload accessor implemented by concrete response classes.""" raise NotImplementedError - def _get_json(self, json_data: bytes) -> dict: - try: - data = json.loads(json_data) - except ValueError: - data = {} - return data + @staticmethod + def _normalize_headers(headers: dict) -> dict: + """Normalize header keys to lowercase for case-insensitive lookup.""" + return {str(k).lower(): v for k, v in headers.items()} class PrivXAPIResponse(BaseResponse): - """ - Response object for PrivX API library. + """Buffered response for standard API methods (`GET`/`POST`/`PUT`/`DELETE`). - Args: - response_status: The response object to handle. - expected_status: The expected response status. - data: data from the response.read() method. + Use this class when the full body is read into memory and callers need + convenience helpers such as `.data` and `.content`. """ def __init__( - self, response_status: http.HTTPStatus, expected_status: int, data: bytes + self, + response_status: int, + expected_status: int, + data: Union[bytes, str, None], + headers: Optional[dict] = None, ) -> None: - super().__init__() - self.status = response_status - if expected_status == self.status: - self._ok = True - self._data = self._get_json(data) + """Build a buffered response and derive backward-compatible `.data`.""" + ok = response_status == expected_status + super().__init__(response_status, ok, headers=headers) + self._raw = self._to_bytes(data) + if ok: + self._data = self._get_json(self._raw) else: - self._ok = False - response_struct = { - "status": self.status, - "details": self._get_json(data), + self._data = { + "status": response_status, + "details": self._get_json(self._raw), } - self._data = response_struct def __str__(self) -> str: - return "PrivXResponse {}".format(self.status) + """Readable response representation for logs/debugging.""" + return f"PrivXResponse {self._status}" @property def data(self) -> dict: - """ - Returns: - The response data + """SDK-normalized payload kept for backward compatibility. + + Success responses return parsed JSON content. + Non-success responses return `{"status": ..., "details": ...}`. """ return self._data + @property + def content(self) -> bytes: + """Raw response body bytes.""" + return self._raw + + @staticmethod + def _get_json(json_data: bytes) -> dict: + """Parse JSON body and return empty dict for invalid/empty JSON.""" + try: + return json.loads(json_data) + except ValueError: + return {} + + @staticmethod + def _to_bytes(data: Union[bytes, str, None]) -> bytes: + """Normalize supported body input types to bytes.""" + if data is None: + return b"" + if isinstance(data, bytes): + return data + return data.encode("utf-8") + class PrivXStreamResponse(BaseResponse): - """ - Example: - with open("test.txt", "w") as file: - for char in StreamResponseObject.iter_content(chunk_size): - file.write(char.decode("utf-8")) + """Streaming response for endpoints that should be consumed incrementally. + + Use this class for large files/artifacts where reading the whole response + into memory would be unnecessary or expensive. """ def __init__( self, response: http.client.HTTPResponse, expected_status: int, + headers: Optional[dict] = None, ) -> None: - super().__init__() + """Wrap an open `HTTPResponse` and expose a chunk iterator.""" + ok = response.status == expected_status + super().__init__(response.status, ok, headers=headers) self._response = response - self._status = response.status - self._ok = self._status == expected_status def __str__(self) -> str: - return "PrivXStreamResponse {}".format(self._status) + """Readable stream response representation for logs/debugging.""" + return f"PrivXStreamResponse {self._status}" @property def data(self) -> NoReturn: + """Stream responses do not support full in-memory payload access.""" raise InternalAPIException("Should not access all data in a stream response") - def iter_content(self, chunk_size: int = 1024 * 1024) -> bytes: - """ - Generator for reading and returning response by chunk - """ - + def iter_content( + self, chunk_size: int = 1024 * 1024 + ) -> Generator[bytes, Any, None]: + """Yield response bytes by chunk and always close the socket at end.""" try: while True: chunk = self._response.read(chunk_size) diff --git a/privx_api/role_store.py b/privx_api/role_store.py index 8dafadc..dd49906 100644 --- a/privx_api/role_store.py +++ b/privx_api/role_store.py @@ -15,7 +15,7 @@ def get_role_store_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.ROLE_STORE.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_sources(self) -> PrivXAPIResponse: """ @@ -25,7 +25,7 @@ def get_sources(self) -> PrivXAPIResponse: PrivxAPIResponse """ response_status, data = self._http_get(UrlEnum.ROLE_STORE.SOURCES) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_source(self, source_params: dict) -> PrivXAPIResponse: """ @@ -38,7 +38,7 @@ def create_source(self, source_params: dict) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.ROLE_STORE.SOURCES, body=source_params ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_source(self, source_id: str) -> PrivXAPIResponse: """ @@ -50,7 +50,7 @@ def get_source(self, source_id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.ROLE_STORE.SOURCE, path_params={"source_id": source_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_source(self, source_id: str) -> PrivXAPIResponse: """ @@ -62,7 +62,7 @@ def delete_source(self, source_id: str) -> PrivXAPIResponse: response_status, data = self._http_delete( UrlEnum.ROLE_STORE.SOURCE, path_params={"source_id": source_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_source(self, source_id: str, source_params: dict) -> PrivXAPIResponse: """ @@ -76,7 +76,7 @@ def update_source(self, source_id: str, source_params: dict) -> PrivXAPIResponse path_params={"source_id": source_id}, body=source_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def refresh_source(self, source_ids: list) -> PrivXAPIResponse: """ @@ -89,7 +89,7 @@ def refresh_source(self, source_ids: list) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.ROLE_STORE.REFRESH_SOURCES, body=source_ids ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_aws_role_links(self, refresh: bool = False) -> PrivXAPIResponse: """ @@ -102,7 +102,7 @@ def get_aws_role_links(self, refresh: bool = False) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.ROLE_STORE.AWS_ROLES, query_params=search_params ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_aws_role_link(self, aws_role_id: str) -> PrivXAPIResponse: """ @@ -114,7 +114,7 @@ def get_aws_role_link(self, aws_role_id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.ROLE_STORE.AWS_ROLE, path_params={"awsrole_id": aws_role_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_aws_role_link(self, aws_role_id: str) -> PrivXAPIResponse: """ @@ -128,7 +128,7 @@ def delete_aws_role_link(self, aws_role_id: str) -> PrivXAPIResponse: response_status, data = self._http_delete( UrlEnum.ROLE_STORE.AWS_ROLE, path_params={"awsrole_id": aws_role_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_aws_role_link( self, aws_role_id: str, privx_roles: list @@ -144,7 +144,7 @@ def update_aws_role_link( path_params={"awsrole_id": aws_role_id}, body=privx_roles, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_linked_roles(self, aws_role_id: str) -> PrivXAPIResponse: """ @@ -157,7 +157,7 @@ def get_linked_roles(self, aws_role_id: str) -> PrivXAPIResponse: UrlEnum.ROLE_STORE.AWS_ROLE_PRIVX_ROLES, path_params={"awsrole_id": aws_role_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_roles( self, @@ -178,7 +178,7 @@ def get_roles( response_status, data = self._http_get( UrlEnum.ROLE_STORE.ROLES, query_params=search_params ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_role(self, role: dict) -> PrivXAPIResponse: """ @@ -188,7 +188,7 @@ def create_role(self, role: dict) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_post(UrlEnum.ROLE_STORE.ROLES, body=role) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def resolve_roles(self, role_names: list) -> PrivXAPIResponse: """ @@ -200,7 +200,7 @@ def resolve_roles(self, role_names: list) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.ROLE_STORE.RESOLVE, body=role_names ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def evaluate_role(self, role_params: dict) -> PrivXAPIResponse: """ @@ -212,7 +212,7 @@ def evaluate_role(self, role_params: dict) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.ROLE_STORE.EVALUATE, body=role_params ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_role(self, role_id: str) -> PrivXAPIResponse: """ @@ -224,7 +224,7 @@ def get_role(self, role_id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.ROLE_STORE.ROLE, path_params={"role_id": role_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_role(self, role_id: str) -> PrivXAPIResponse: """ @@ -236,7 +236,7 @@ def delete_role(self, role_id: str) -> PrivXAPIResponse: response_status, data = self._http_delete( UrlEnum.ROLE_STORE.ROLE, path_params={"role_id": role_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_role(self, role_id: str, role: dict) -> PrivXAPIResponse: """ @@ -248,7 +248,7 @@ def update_role(self, role_id: str, role: dict) -> PrivXAPIResponse: response_status, data = self._http_put( UrlEnum.ROLE_STORE.ROLE, path_params={"role_id": role_id}, body=role ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_role_members( self, @@ -272,7 +272,7 @@ def get_role_members( path_params={"role_id": role_id}, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_aws_token( self, aws_role_id: str, ttl: int = 900, token_code: Optional[str] = None @@ -296,7 +296,7 @@ def get_aws_token( path_params={"awsrole_id": aws_role_id}, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_principal_keys(self, role_id: str) -> PrivXAPIResponse: """ @@ -308,7 +308,7 @@ def get_principal_keys(self, role_id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.ROLE_STORE.PRINCIPAL_KEYS, path_params={"role_id": role_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def generate_principal_key(self, role_id: str) -> PrivXAPIResponse: """ @@ -320,7 +320,7 @@ def generate_principal_key(self, role_id: str) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.ROLE_STORE.GENERATE_PRINCIPAL_KEY, path_params={"role_id": role_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def import_principal_key(self, role_id: str, primary_key: dict) -> PrivXAPIResponse: """ @@ -334,7 +334,7 @@ def import_principal_key(self, role_id: str, primary_key: dict) -> PrivXAPIRespo path_params={"role_id": role_id}, body=primary_key, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_principal_key(self, role_id: str, key_id: str) -> PrivXAPIResponse: """ @@ -347,7 +347,7 @@ def get_principal_key(self, role_id: str, key_id: str) -> PrivXAPIResponse: UrlEnum.ROLE_STORE.PRINCIPAL_KEY, path_params={"role_id": role_id, "key_id": key_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_principal_key(self, role_id: str, key_id: str) -> PrivXAPIResponse: """ @@ -360,7 +360,7 @@ def delete_principal_key(self, role_id: str, key_id: str) -> PrivXAPIResponse: UrlEnum.ROLE_STORE.PRINCIPAL_KEY, path_params={"role_id": role_id, "key_id": key_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user(self, user_id: str) -> PrivXAPIResponse: """ @@ -372,7 +372,7 @@ def get_user(self, user_id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.ROLE_STORE.USERS, path_params={"user_id": user_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user_settings(self, user_id: str) -> PrivXAPIResponse: """ @@ -384,7 +384,7 @@ def get_user_settings(self, user_id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.ROLE_STORE.USER_SETTINGS, path_params={"user_id": user_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def set_user_settings(self, user_id: str, settings: dict) -> PrivXAPIResponse: """ @@ -398,7 +398,7 @@ def set_user_settings(self, user_id: str, settings: dict) -> PrivXAPIResponse: path_params={"user_id": user_id}, body=settings, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user_roles(self, user_id: str) -> PrivXAPIResponse: """ @@ -410,7 +410,7 @@ def get_user_roles(self, user_id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.ROLE_STORE.USER_ROLES, path_params={"user_id": user_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def set_user_roles(self, user_id: str, roles: list) -> PrivXAPIResponse: """ @@ -422,7 +422,7 @@ def set_user_roles(self, user_id: str, roles: list) -> PrivXAPIResponse: response_status, data = self._http_put( UrlEnum.ROLE_STORE.USER_ROLES, path_params={"user_id": user_id}, body=roles ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def enable_user_mfa(self, user_ids: list) -> PrivXAPIResponse: """ @@ -434,7 +434,7 @@ def enable_user_mfa(self, user_ids: list) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.ROLE_STORE.ENABLE_MFA, body=user_ids ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def disable_user_mfa(self, user_ids: list) -> PrivXAPIResponse: """ @@ -446,7 +446,7 @@ def disable_user_mfa(self, user_ids: list) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.ROLE_STORE.DISABLE_MFA, body=user_ids ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def reset_user_mfa(self, user_ids: list) -> PrivXAPIResponse: """ @@ -458,7 +458,7 @@ def reset_user_mfa(self, user_ids: list) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.ROLE_STORE.RESET_MFA, body=user_ids ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def resolve_user(self, user_id: str) -> PrivXAPIResponse: """ @@ -470,7 +470,7 @@ def resolve_user(self, user_id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.ROLE_STORE.RESOLVE_USER, path_params={"user_id": user_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_users( self, @@ -490,7 +490,7 @@ def search_users( query_params=search_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_roles( self, @@ -510,7 +510,7 @@ def search_roles( query_params=search_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_external_users(self, external_params: dict) -> PrivXAPIResponse: """ @@ -519,7 +519,7 @@ def search_external_users(self, external_params: dict) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.ROLE_STORE.EXTERNAL_SEARCH, body=external_params ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user_authorized_keys(self, user_id: str) -> PrivXAPIResponse: """ @@ -531,7 +531,7 @@ def get_user_authorized_keys(self, user_id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.ROLE_STORE.USER_AUTHORIZED_KEYS, path_params={"user_id": user_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_user_authorize_key( self, user_id: str, key_params: dict @@ -544,7 +544,7 @@ def create_user_authorize_key( path_params={"user_id": user_id}, body=key_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_user_authorized_key(self, user_id: str, key_id: str) -> PrivXAPIResponse: """ @@ -557,7 +557,7 @@ def get_user_authorized_key(self, user_id: str, key_id: str) -> PrivXAPIResponse UrlEnum.ROLE_STORE.USER_AUTHORIZED_KEY_ID, path_params={"user_id": user_id, "key_id": key_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_user_authorized_key( self, user_id: str, key_id: str, key_params: dict @@ -573,7 +573,7 @@ def update_user_authorized_key( path_params={"user_id": user_id, "key_id": key_id}, body=key_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_user_authorized_key(self, user_id: str, key_id: str) -> PrivXAPIResponse: """ @@ -586,7 +586,7 @@ def delete_user_authorized_key(self, user_id: str, key_id: str) -> PrivXAPIRespo UrlEnum.ROLE_STORE.USER_AUTHORIZED_KEY_ID, path_params={"user_id": user_id, "key_id": key_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_log_collectors(self) -> PrivXAPIResponse: """ @@ -596,7 +596,7 @@ def get_log_collectors(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.ROLE_STORE.LOG_CONF_COLLECTORS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_log_collector(self, collector: dict) -> PrivXAPIResponse: """ @@ -608,7 +608,7 @@ def create_log_collector(self, collector: dict) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.ROLE_STORE.LOG_CONF_COLLECTORS, body=collector ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_log_collector(self, collector_id: str) -> PrivXAPIResponse: """ @@ -621,7 +621,7 @@ def get_log_collector(self, collector_id: str) -> PrivXAPIResponse: UrlEnum.ROLE_STORE.LOG_CONF_COLLECTOR, path_params={"collector_id": collector_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_log_collector( self, collector_id: str, collector_params: dict @@ -637,7 +637,7 @@ def update_log_collector( path_params={"collector_id": collector_id}, body=collector_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_log_collector(self, collector_id: str) -> PrivXAPIResponse: """ @@ -650,7 +650,7 @@ def delete_log_collector(self, collector_id: str) -> PrivXAPIResponse: UrlEnum.ROLE_STORE.LOG_CONF_COLLECTOR, path_params={"collector_id": collector_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_all_authorized_keys( self, @@ -671,7 +671,7 @@ def get_all_authorized_keys( response_status, data = self._http_get( UrlEnum.ROLE_STORE.ALL_AUTHORIZED_KEYS, query_params=search_params ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def resolve_authorized_keys(self, authorized_keys: dict) -> PrivXAPIResponse: """ @@ -683,7 +683,7 @@ def resolve_authorized_keys(self, authorized_keys: dict) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.ROLE_STORE.RESOLVE_AUTHORIZED_KEYS, body=authorized_keys ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def list_all_idendity_providers( self, offset: Optional[int] = None, limit: Optional[int] = None @@ -698,7 +698,7 @@ def list_all_idendity_providers( response_status, data = self._http_get( UrlEnum.ROLE_STORE.IDENDITY_PROVIDERS, query_params=search_params ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_idendity_provider_by_id(self, id: str) -> PrivXAPIResponse: """ @@ -710,7 +710,7 @@ def get_idendity_provider_by_id(self, id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.ROLE_STORE.IDENDITY_PROVIDERS_ID, path_params={"id": id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_idendity_provider( self, idendity_provider_params: dict @@ -724,7 +724,7 @@ def create_idendity_provider( response_status, data = self._http_post( UrlEnum.ROLE_STORE.IDENDITY_PROVIDERS, body=idendity_provider_params ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def delete_idendity_provider(self, id: str) -> PrivXAPIResponse: """ @@ -736,7 +736,7 @@ def delete_idendity_provider(self, id: str) -> PrivXAPIResponse: response_status, data = self._http_delete( UrlEnum.ROLE_STORE.IDENDITY_PROVIDERS_ID, path_params={"id": id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_idendity_provider( self, id: str, idendity_provider_params: dict @@ -752,7 +752,7 @@ def update_idendity_provider( path_params={"id": id}, body=idendity_provider_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_idendity_providers( self, @@ -778,4 +778,4 @@ def search_idendity_providers( query_params=search_params, body=get_value(keywords, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/secrets_manager.py b/privx_api/secrets_manager.py index c2f4a7a..14688cd 100644 --- a/privx_api/secrets_manager.py +++ b/privx_api/secrets_manager.py @@ -16,7 +16,7 @@ def get_secrets_manager_service_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.SECRETS_MANAGER.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_password_policies(self) -> PrivXAPIResponse: """ @@ -28,7 +28,7 @@ def get_password_policies(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.SECRETS_MANAGER.PASSWORD_POLICIES ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_password_policy(self, source_params: dict) -> PrivXAPIResponse: """ @@ -41,7 +41,7 @@ def create_password_policy(self, source_params: dict) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.SECRETS_MANAGER.CREATE_PASSWORD_POLICY, body=source_params ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_password_policy(self, id: str) -> PrivXAPIResponse: """ @@ -53,7 +53,7 @@ def get_password_policy(self, id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.SECRETS_MANAGER.PASSWORD_POLICY, path_params={"id": id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_password_policy(self, id: str, policy_params: dict) -> PrivXAPIResponse: """ @@ -67,7 +67,7 @@ def update_password_policy(self, id: str, policy_params: dict) -> PrivXAPIRespon path_params={"id": id}, body=policy_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_password_policy(self, id: str) -> PrivXAPIResponse: """ @@ -79,7 +79,7 @@ def delete_password_policy(self, id: str) -> PrivXAPIResponse: response_status, data = self._http_delete( UrlEnum.SECRETS_MANAGER.PASSWORD_POLICY, path_params={"id": id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def rotate_host_password(self, host_id: str, account: str) -> PrivXAPIResponse: """ @@ -92,7 +92,7 @@ def rotate_host_password(self, host_id: str, account: str) -> PrivXAPIResponse: UrlEnum.SECRETS_MANAGER.ROTATE_HOST_PASSWORD, path_params={"host_id": host_id, "account": account}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_script_templates(self) -> PrivXAPIResponse: """ @@ -102,7 +102,7 @@ def get_script_templates(self) -> PrivXAPIResponse: PrivxAPIResponse """ response_status, data = self._http_get(UrlEnum.SECRETS_MANAGER.SCRIPT_TEMPLATES) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_script_template(self, template_params: dict) -> PrivXAPIResponse: """ @@ -115,7 +115,7 @@ def create_script_template(self, template_params: dict) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.SECRETS_MANAGER.CREATE_SCRIPT_TEMPLATE, body=template_params ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_script_template(self, id: str) -> PrivXAPIResponse: """ @@ -127,7 +127,7 @@ def get_script_template(self, id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.SECRETS_MANAGER.SCRIPT_TEMPLATE, path_params={"id": id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_script_template( self, id: str, template_params: dict @@ -143,7 +143,7 @@ def update_script_template( path_params={"id": id}, body=template_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_script_template(self, id: str) -> PrivXAPIResponse: """ @@ -155,7 +155,7 @@ def delete_script_template(self, id: str) -> PrivXAPIResponse: response_status, data = self._http_delete( UrlEnum.SECRETS_MANAGER.SCRIPT_TEMPLATE, path_params={"id": id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def compile_script_template(self, template_params: dict) -> PrivXAPIResponse: """ @@ -168,7 +168,7 @@ def compile_script_template(self, template_params: dict) -> PrivXAPIResponse: UrlEnum.SECRETS_MANAGER.COMPILE_SCRIPT_TEMPLATE, body=template_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_target_domains( self, @@ -190,7 +190,7 @@ def get_target_domains( UrlEnum.SECRETS_MANAGER.TARGET_DOMAINS, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_target_domain(self, td_params: dict) -> PrivXAPIResponse: """ @@ -203,7 +203,7 @@ def create_target_domain(self, td_params: dict) -> PrivXAPIResponse: response_status, data = self._http_post( UrlEnum.SECRETS_MANAGER.TARGET_DOMAINS, body=td_params ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def search_target_domain( self, @@ -223,7 +223,7 @@ def search_target_domain( query_params=search_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_target_domain(self, target_domain_id: str) -> PrivXAPIResponse: """ @@ -236,7 +236,7 @@ def get_target_domain(self, target_domain_id: str) -> PrivXAPIResponse: UrlEnum.SECRETS_MANAGER.TARGET_DOMAIN, path_params={"target_domain_id": target_domain_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_target_domain( self, target_domain_id: str, td_params: dict @@ -252,7 +252,7 @@ def update_target_domain( path_params={"target_domain_id": target_domain_id}, body=td_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_target_domain(self, target_domain_id: str) -> PrivXAPIResponse: """ @@ -265,7 +265,7 @@ def delete_target_domain(self, target_domain_id: str) -> PrivXAPIResponse: UrlEnum.SECRETS_MANAGER.TARGET_DOMAIN, path_params={"target_domain_id": target_domain_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def refresh_target_domain(self, target_domain_id: str) -> PrivXAPIResponse: """ @@ -278,7 +278,7 @@ def refresh_target_domain(self, target_domain_id: str) -> PrivXAPIResponse: UrlEnum.SECRETS_MANAGER.TARGET_DOMAIN, path_params={"target_domain_id": target_domain_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_target_domain_accounts( self, @@ -302,7 +302,7 @@ def get_target_domain_accounts( path_params={"target_domain_id": target_domain_id}, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_target_domain_accounts( self, @@ -329,7 +329,7 @@ def search_target_domain_accounts( query_params=search_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_target_domain_account( self, target_domain_id: str, account_id: str @@ -347,7 +347,7 @@ def get_target_domain_account( "account_id": account_id, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_target_domain_account( self, target_domain_id: str, account_id: str, td_params: dict @@ -366,7 +366,7 @@ def update_target_domain_account( }, body=td_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def batch_update_target_domain_account( self, target_domain_id: str, td_params: dict @@ -382,7 +382,7 @@ def batch_update_target_domain_account( path_params={"target_domain_id": target_domain_id}, body=td_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_target_domain_managed_accounts( self, @@ -406,7 +406,7 @@ def get_target_domain_managed_accounts( path_params={"target_domain_id": target_domain_id}, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_target_domain_managed_account( self, target_domain_id: str, td_params: dict @@ -423,7 +423,7 @@ def create_target_domain_managed_account( path_params={"target_domain_id": target_domain_id}, body=td_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def search_target_domain_managed_accounts( self, @@ -450,7 +450,7 @@ def search_target_domain_managed_accounts( query_params=search_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_target_domain_managed_account( self, target_domain_id: str, managed_account_id: str @@ -468,7 +468,7 @@ def get_target_domain_managed_account( "managed_account_id": managed_account_id, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_target_domain_managed_account( self, target_domain_id: str, managed_account_id: str, td_params: dict @@ -487,7 +487,7 @@ def update_target_domain_managed_account( }, body=td_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_target_domain_managed_account( self, target_domain_id: str, managed_account_id: str @@ -505,7 +505,7 @@ def delete_target_domain_managed_account( "managed_account_id": managed_account_id, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def rotate_target_domain_managed_account( self, target_domain_id: str, managed_account_id: str @@ -523,7 +523,7 @@ def rotate_target_domain_managed_account( "managed_account_id": managed_account_id, }, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def resolve_target_domains( self, @@ -539,7 +539,7 @@ def resolve_target_domains( UrlEnum.SECRETS_MANAGER.RESOLVE_TARGET_DOMAINS, body=target_domain_names, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def password_target_domain_managed_account( self, target_domain_id: str, managed_account_id: str, td_params: dict @@ -558,7 +558,7 @@ def password_target_domain_managed_account( }, body=td_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def batch_create_target_domain_managed_account( self, target_domain_id: str, td_params: dict @@ -574,7 +574,7 @@ def batch_create_target_domain_managed_account( path_params={"target_domain_id": target_domain_id}, body=td_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def batch_update_target_domain_managed_account( self, target_domain_id: str, td_params: dict @@ -590,7 +590,7 @@ def batch_update_target_domain_managed_account( path_params={"target_domain_id": target_domain_id}, body=td_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def batch_delete_target_domain_managed_account( self, target_domain_id: str, td_params: dict @@ -606,7 +606,7 @@ def batch_delete_target_domain_managed_account( path_params={"target_domain_id": target_domain_id}, body=td_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def batch_rotate_target_domain_managed_account( self, target_domain_id: str, td_params: dict @@ -622,4 +622,4 @@ def batch_rotate_target_domain_managed_account( path_params={"target_domain_id": target_domain_id}, body=td_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/settings_service.py b/privx_api/settings_service.py index d71a880..2c20877 100644 --- a/privx_api/settings_service.py +++ b/privx_api/settings_service.py @@ -19,7 +19,7 @@ def restart_required(self, scope: str, scope_params: dict) -> PrivXAPIResponse: path_params={"scope": scope}, body=scope_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_settings_service_status(self) -> PrivXAPIResponse: """ @@ -29,7 +29,7 @@ def get_settings_service_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.SETTINGS.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_scope_settings( self, scope: str, merge: Optional[bool] = None @@ -45,7 +45,7 @@ def get_scope_settings( path_params={"scope": scope}, query_params=self._get_search_params(merge=merge), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_scope_settings(self, scope: str, scope_params: dict) -> PrivXAPIResponse: """ @@ -59,7 +59,7 @@ def update_scope_settings(self, scope: str, scope_params: dict) -> PrivXAPIRespo path_params={"scope": scope}, body=scope_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_scope_section_settings(self, scope: str, section: str) -> PrivXAPIResponse: """ @@ -72,7 +72,7 @@ def get_scope_section_settings(self, scope: str, section: str) -> PrivXAPIRespon UrlEnum.SETTINGS.SCOPE_SECTION, path_params={"scope": scope, "section": section}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_scope_section_settings( self, scope: str, section: str, section_params: dict @@ -88,7 +88,7 @@ def update_scope_section_settings( path_params={"scope": scope, "section": section}, body=section_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_scope_schema(self, scope: str) -> PrivXAPIResponse: """ @@ -101,7 +101,7 @@ def get_scope_schema(self, scope: str) -> PrivXAPIResponse: UrlEnum.SETTINGS.SCOPE_SCHEMA, path_params={"scope": scope}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_scope_section_schema(self, scope: str, section: str) -> PrivXAPIResponse: """ @@ -114,4 +114,4 @@ def get_scope_section_schema(self, scope: str, section: str) -> PrivXAPIResponse UrlEnum.SETTINGS.SCOPE_SECTION_SCHEMA, path_params={"scope": scope, "section": section}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/tests/test_api.py b/privx_api/tests/test_api.py index 6bdc82d..48406e5 100644 --- a/privx_api/tests/test_api.py +++ b/privx_api/tests/test_api.py @@ -1,3 +1,5 @@ +from enum import Enum +from http import HTTPStatus from unittest import mock import pytest @@ -115,6 +117,112 @@ def test_make_body_params(value, expected_value): assert api._make_body_params(value) == expected_value +class DummyEnum(Enum): + SAMPLE = "VaLue" + + +def test_get_search_params_coercion(): + api = PrivXAPI("", 0, "", "", "") + + params = api._get_search_params( + truthy=True, + falsy=False, + mixed_case="FoObAr", + keep_int=5, + skip_none=None, + ) + + assert params == { + "truthy": "true", + "falsy": "false", + "mixed_case": "FoObAr", + "keep_int": 5, + } + + +def test_get_search_params_normalizes_common_query_params_for_server_compat(): + api = PrivXAPI("", 0, "", "", "") + + params = api._get_search_params( + filter="AcCeSsIbLe", + sortdir="ASC", + query="SomeText", + verbose=True, + fuzzy_count=bool(23), + ) + + assert params == { + "filter": "AcCeSsIbLe", + "sortdir": "ASC", + "query": "SomeText", + "verbose": "true", + "fuzzy_count": "true", + } + + +class DummyHTTPResponse: + def __init__(self, payload: bytes, *, status: HTTPStatus = HTTPStatus.OK) -> None: + self._payload = payload + self.status = status + + def read(self, chunk_size: int = -1) -> bytes: + if chunk_size == -1: + data = self._payload + self._payload = b"" + return data + chunk = self._payload[:chunk_size] + self._payload = self._payload[chunk_size:] + return chunk + + def close(self) -> None: + return None + + +def test_api_response_helper_passes_last_response_headers(): + api = PrivXAPI("", 0, "", "", "") + api._store_response_headers({"Content-Type": "application/json"}) + + resp = api._api_response(HTTPStatus.OK, HTTPStatus.OK, b'{"k": "v"}') + + assert resp.headers["content-type"] == "application/json" + assert resp.status == HTTPStatus.OK + assert resp.ok is True + + +def test_stream_response_helper_passes_last_response_headers(): + api = PrivXAPI("", 0, "", "", "") + api._store_response_headers({"X-Request-Id": "req-123"}) + + resp = api._stream_api_response(DummyHTTPResponse(b"abc"), HTTPStatus.OK) + + assert resp.headers["x-request-id"] == "req-123" + assert resp.status == HTTPStatus.OK + assert resp.ok is True + + +def test_api_response_helper_marks_non_expected_status_as_not_ok(): + api = PrivXAPI("", 0, "", "", "") + api._store_response_headers({"Content-Type": "application/json"}) + + resp = api._api_response(HTTPStatus.BAD_REQUEST, HTTPStatus.OK, b'{"error":"x"}') + + assert resp.ok is False + assert resp.data == {"status": HTTPStatus.BAD_REQUEST, "details": {"error": "x"}} + + +def test_stream_response_helper_marks_non_expected_status_as_not_ok(): + api = PrivXAPI("", 0, "", "", "") + api._store_response_headers({"X-Request-Id": "req-123"}) + + resp = api._stream_api_response( + DummyHTTPResponse(b"abc", status=HTTPStatus.BAD_REQUEST), + HTTPStatus.OK, + ) + + assert resp.ok is False + assert resp.status == HTTPStatus.BAD_REQUEST + + @pytest.mark.parametrize( "params, exp_result", [ diff --git a/privx_api/tests/test_response.py b/privx_api/tests/test_response.py new file mode 100644 index 0000000..7775e11 --- /dev/null +++ b/privx_api/tests/test_response.py @@ -0,0 +1,148 @@ +import io +from http import HTTPStatus +from typing import Union + +import pytest + +from privx_api.exceptions import InternalAPIException +from privx_api.response import PrivXAPIResponse, PrivXStreamResponse + + +class DummyHTTPResponse: + def __init__( + self, payload: bytes, *, status: Union[int, HTTPStatus] = HTTPStatus.OK + ) -> None: + self._payload = io.BytesIO(payload) + self.status = status + + def read(self, chunk_size: int = -1) -> bytes: + return self._payload.read(chunk_size) + + def close(self) -> None: + self._payload.close() + + +class DummyAPI: + def __init__(self): + self.last_response_headers = {} + + def response( + self, status=HTTPStatus.OK, expected=HTTPStatus.OK, body=b"", headers=None + ): + self.last_response_headers = headers or {} + return PrivXAPIResponse( + status, expected, body, headers=headers or self.last_response_headers + ) + + def stream_response(self, payload=b"", headers=None): + self.last_response_headers = headers or {} + dummy_http_resp = DummyHTTPResponse(payload) + return PrivXStreamResponse( + dummy_http_resp, HTTPStatus.OK, headers=self.last_response_headers + ) + + +def test_api_response_content_and_text(): + api = DummyAPI() + resp = api.response( + body=b"secret", headers={"Content-Type": "text/plain; charset=utf-8"} + ) + + assert resp.ok is True + assert resp.content == b"secret" + + +def test_api_response_json_detection(): + api = DummyAPI() + resp = api.response( + body=b'{"key": "value"}', headers={"Content-Type": "application/json"} + ) + + assert resp.data == {"key": "value"} + + +def test_api_response_data_on_non_expected_status_keeps_error_details(): + api = DummyAPI() + resp = api.response( + status=HTTPStatus.BAD_REQUEST, + expected=HTTPStatus.OK, + body=b'{"message":"bad request"}', + headers={"Content-Type": "application/json"}, + ) + + assert resp.ok is False + assert resp.data == { + "status": HTTPStatus.BAD_REQUEST, + "details": {"message": "bad request"}, + } + + +def test_api_response_data_on_non_expected_status_uses_empty_details_for_invalid_json(): + api = DummyAPI() + resp = api.response( + status=HTTPStatus.BAD_REQUEST, + expected=HTTPStatus.OK, + body=b"not-json", + headers={"Content-Type": "application/json"}, + ) + + assert resp.ok is False + assert resp.data == {"status": HTTPStatus.BAD_REQUEST, "details": {}} + + +def test_stream_response_iter_content(): + api = DummyAPI() + resp = api.stream_response(payload=b"abcdef") + chunks = list(resp.iter_content(2)) + assert chunks == [b"ab", b"cd", b"ef"] + + +def test_stream_response_data_property_raises(): + api = DummyAPI() + resp = api.stream_response(payload=b"abcdef") + + with pytest.raises(InternalAPIException): + _ = resp.data + + +def test_stream_response_keeps_non_standard_status_code(): + response = DummyHTTPResponse(b"payload", status=599) + stream_response = PrivXStreamResponse(response, HTTPStatus.OK, headers={}) + + assert stream_response.status == 599 + assert stream_response.ok is False + + +def test_normalize_headers_lowercases_keys(): + normalized = PrivXAPIResponse._normalize_headers( + {"Content-Type": "application/json", "X-Request-Id": "req-1"} + ) + + assert normalized == { + "content-type": "application/json", + "x-request-id": "req-1", + } + + +@pytest.mark.parametrize( + "payload, expected", + [ + (b'{"a": 1}', {"a": 1}), + (b"", {}), + (b"invalid", {}), + ], +) +def test_get_json_staticmethod(payload, expected): + assert PrivXAPIResponse._get_json(payload) == expected + + +@pytest.mark.parametrize( + "payload, expected", + [ + (None, b""), + (b"raw", b"raw"), + ("text", b"text"), + ], +) +def test_to_bytes_staticmethod(payload, expected): + assert PrivXAPIResponse._to_bytes(payload) == expected diff --git a/privx_api/trail_index.py b/privx_api/trail_index.py index f0a547b..9b5235a 100644 --- a/privx_api/trail_index.py +++ b/privx_api/trail_index.py @@ -15,7 +15,7 @@ def get_trail_index_service_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.TRAIL_INDEX.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_indexing_status(self, conn_id: str) -> PrivXAPIResponse: """ @@ -28,7 +28,7 @@ def get_indexing_status(self, conn_id: str) -> PrivXAPIResponse: UrlEnum.TRAIL_INDEX.CONNECTION_INDEXING_STATUS, path_params={"connection_id": conn_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def resolve_indexing_statuses(self, conn_ids: list) -> PrivXAPIResponse: """ @@ -41,7 +41,7 @@ def resolve_indexing_statuses(self, conn_ids: list) -> PrivXAPIResponse: UrlEnum.TRAIL_INDEX.CONNECTIONS_INDEXING_STATUSES, body=conn_ids, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def start_indexing(self, conn_ids: list) -> PrivXAPIResponse: """ @@ -54,7 +54,7 @@ def start_indexing(self, conn_ids: list) -> PrivXAPIResponse: UrlEnum.TRAIL_INDEX.START_INDEXING, body=conn_ids, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_index( self, @@ -79,4 +79,4 @@ def search_index( query_params=search_params, body=trails_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/user_store.py b/privx_api/user_store.py index 07ad3de..fbbab18 100644 --- a/privx_api/user_store.py +++ b/privx_api/user_store.py @@ -15,7 +15,7 @@ def get_user_store_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.USER_STORE.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_users( self, @@ -37,7 +37,7 @@ def get_users( response_status, data = self._http_get( UrlEnum.USER_STORE.USERS, query_params=search_params ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_user(self, user: dict) -> PrivXAPIResponse: """ @@ -47,7 +47,7 @@ def create_user(self, user: dict) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_post(UrlEnum.USER_STORE.USERS, body=user) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_user(self, user_id: str) -> PrivXAPIResponse: """ @@ -60,7 +60,7 @@ def get_user(self, user_id: str) -> PrivXAPIResponse: UrlEnum.USER_STORE.USER, path_params={"user_id": user_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_user(self, user_id: str, user_params: dict) -> PrivXAPIResponse: """ @@ -74,7 +74,7 @@ def update_user(self, user_id: str, user_params: dict) -> PrivXAPIResponse: path_params={"user_id": user_id}, body=user_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_user(self, user_id: str) -> PrivXAPIResponse: """ @@ -86,7 +86,7 @@ def delete_user(self, user_id: str) -> PrivXAPIResponse: response_status, data = self._http_delete( UrlEnum.USER_STORE.USER, path_params={"user_id": user_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def set_user_password(self, user_id: str, user_password: dict) -> PrivXAPIResponse: """ @@ -100,7 +100,7 @@ def set_user_password(self, user_id: str, user_password: dict) -> PrivXAPIRespon path_params={"user_id": user_id}, body=user_password, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user_tags( self, @@ -122,7 +122,7 @@ def get_user_tags( UrlEnum.USER_STORE.USER_TAGS, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_trusted_clients(self) -> PrivXAPIResponse: """ @@ -134,7 +134,7 @@ def get_trusted_clients(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.USER_STORE.TRUSTED_CLIENTS, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_trusted_client(self, client_params: dict) -> PrivXAPIResponse: """ @@ -149,7 +149,7 @@ def create_trusted_client(self, client_params: dict) -> PrivXAPIResponse: UrlEnum.USER_STORE.TRUSTED_CLIENTS, body=client_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_trusted_client(self, client_id: str) -> PrivXAPIResponse: """ @@ -162,7 +162,7 @@ def get_trusted_client(self, client_id: str) -> PrivXAPIResponse: UrlEnum.USER_STORE.TRUSTED_CLIENT, path_params={"trusted_client_id": client_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_trusted_client(self, client_id: str) -> PrivXAPIResponse: """ @@ -175,7 +175,7 @@ def delete_trusted_client(self, client_id: str) -> PrivXAPIResponse: UrlEnum.USER_STORE.TRUSTED_CLIENT, path_params={"trusted_client_id": client_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_trusted_client( self, client_id: str, client_params: dict @@ -191,7 +191,7 @@ def update_trusted_client( path_params={"trusted_client_id": client_id}, body=client_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_extending_clients(self) -> PrivXAPIResponse: """ @@ -203,7 +203,7 @@ def get_extending_clients(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.USER_STORE.EXTENDING_CLIENTS, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_api_clients(self) -> PrivXAPIResponse: """ @@ -215,7 +215,7 @@ def get_api_clients(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.USER_STORE.API_CLIENTS, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_api_client(self, api_client_params: dict) -> PrivXAPIResponse: """ @@ -230,7 +230,7 @@ def create_api_client(self, api_client_params: dict) -> PrivXAPIResponse: UrlEnum.USER_STORE.API_CLIENTS, body=api_client_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_api_client(self, api_client_id: str) -> PrivXAPIResponse: """ @@ -243,7 +243,7 @@ def get_api_client(self, api_client_id: str) -> PrivXAPIResponse: UrlEnum.USER_STORE.API_CLIENT, path_params={"api_client_id": api_client_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_api_client(self, api_client_id: str) -> PrivXAPIResponse: """ @@ -256,7 +256,7 @@ def delete_api_client(self, api_client_id: str) -> PrivXAPIResponse: UrlEnum.USER_STORE.API_CLIENT, path_params={"api_client_id": api_client_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_api_client( self, api_client_id: str, api_client_params: dict @@ -272,4 +272,4 @@ def update_api_client( path_params={"api_client_id": api_client_id}, body=api_client_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/vault.py b/privx_api/vault.py index abfadce..6cee9f0 100644 --- a/privx_api/vault.py +++ b/privx_api/vault.py @@ -16,7 +16,7 @@ def get_vault_service_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.VAULT.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_secret(self, secret_params: dict) -> PrivXAPIResponse: """ @@ -29,7 +29,7 @@ def create_secret(self, secret_params: dict) -> PrivXAPIResponse: UrlEnum.VAULT.SECRETS, body=secret_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def create_user_secret(self, user_id: str, secret_params: dict) -> PrivXAPIResponse: """ @@ -43,7 +43,7 @@ def create_user_secret(self, user_id: str, secret_params: dict) -> PrivXAPIRespo path_params={"user_id": user_id}, body=secret_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_secrets( self, @@ -60,7 +60,7 @@ def get_secrets( response_status, data = self._http_get( UrlEnum.VAULT.SECRETS, query_params=search_params ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user_secrets( self, @@ -81,7 +81,7 @@ def get_user_secrets( path_params={"user_id": user_id, "name": name}, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_secret(self, name: str) -> PrivXAPIResponse: """ @@ -93,7 +93,7 @@ def get_secret(self, name: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.VAULT.SECRET, path_params={"name": name} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user_secret(self, user_id: str, name: str) -> PrivXAPIResponse: """ @@ -105,7 +105,7 @@ def get_user_secret(self, user_id: str, name: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.VAULT.USER_SECRET, path_params={"user_id": user_id, "name": name} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_secret(self, name: str, secret_params: dict) -> PrivXAPIResponse: """ @@ -119,7 +119,7 @@ def update_secret(self, name: str, secret_params: dict) -> PrivXAPIResponse: path_params={"name": name}, body=secret_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_user_secret( self, user_id: str, name: str, secret_params: dict @@ -135,7 +135,7 @@ def update_user_secret( path_params={"user_id": user_id, "name": name}, body=secret_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_secret(self, name: str) -> PrivXAPIResponse: """ @@ -148,7 +148,7 @@ def delete_secret(self, name: str) -> PrivXAPIResponse: UrlEnum.VAULT.SECRET, path_params={"name": name}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_user_secret(self, user_id: str, name: str) -> PrivXAPIResponse: """ @@ -161,7 +161,7 @@ def delete_user_secret(self, user_id: str, name: str) -> PrivXAPIResponse: UrlEnum.VAULT.USER_SECRET, path_params={"user_id": user_id, "name": name}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_secret_metadata(self, name: str) -> PrivXAPIResponse: """ @@ -173,7 +173,7 @@ def get_secret_metadata(self, name: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.VAULT.METADATA, path_params={"name": name} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_user_secret_metadata(self, user_id: str, name: str) -> PrivXAPIResponse: """ @@ -186,7 +186,7 @@ def get_user_secret_metadata(self, user_id: str, name: str) -> PrivXAPIResponse: UrlEnum.VAULT.USER_SECRET_METADATA, path_params={"user_id": user_id, "name": name}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_secrets( self, @@ -210,7 +210,7 @@ def search_secrets( query_params=search_params, body=get_value(search_payload, dict()), ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_vault_schemas(self) -> PrivXAPIResponse: """ @@ -222,4 +222,4 @@ def get_vault_schemas(self) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.VAULT.SCHEMAS, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/privx_api/workflow_engine.py b/privx_api/workflow_engine.py index 1547a95..f450d67 100644 --- a/privx_api/workflow_engine.py +++ b/privx_api/workflow_engine.py @@ -15,7 +15,7 @@ def get_workflow_engine_status(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.WORKFLOW_ENGINE.STATUS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_workflows( self, @@ -36,7 +36,7 @@ def get_workflows( UrlEnum.WORKFLOW_ENGINE.WORKFLOWS, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_workflow(self, workflow_params: dict) -> PrivXAPIResponse: """ @@ -51,7 +51,7 @@ def create_workflow(self, workflow_params: dict) -> PrivXAPIResponse: UrlEnum.WORKFLOW_ENGINE.WORKFLOWS, body=workflow_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_workflow(self, workflow_id: str) -> PrivXAPIResponse: """ @@ -63,7 +63,7 @@ def get_workflow(self, workflow_id: str) -> PrivXAPIResponse: response_status, data = self._http_get( UrlEnum.WORKFLOW_ENGINE.WORKFLOW, path_params={"workflow_id": workflow_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_workflow(self, workflow_id: str) -> PrivXAPIResponse: """ @@ -75,7 +75,7 @@ def delete_workflow(self, workflow_id: str) -> PrivXAPIResponse: response_status, data = self._http_delete( UrlEnum.WORKFLOW_ENGINE.WORKFLOW, path_params={"workflow_id": workflow_id} ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_workflow( self, workflow_id: str, workflow_params: dict @@ -91,7 +91,7 @@ def update_workflow( path_params={"workflow_id": workflow_id}, body=workflow_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_requests( self, @@ -116,7 +116,7 @@ def get_requests( UrlEnum.WORKFLOW_ENGINE.REQUESTS, query_params=search_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def create_request( self, @@ -132,7 +132,7 @@ def create_request( UrlEnum.WORKFLOW_ENGINE.REQUESTS, body=request_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.CREATED, data) + return self._api_response(response_status, HTTPStatus.CREATED, data) def get_request(self, request_id: str) -> PrivXAPIResponse: """ @@ -145,7 +145,7 @@ def get_request(self, request_id: str) -> PrivXAPIResponse: UrlEnum.WORKFLOW_ENGINE.REQUEST, path_params={"request_id": request_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def delete_request(self, request_id: str) -> PrivXAPIResponse: """ @@ -158,7 +158,7 @@ def delete_request(self, request_id: str) -> PrivXAPIResponse: UrlEnum.WORKFLOW_ENGINE.REQUEST, path_params={"request_id": request_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def set_request_decision( self, request_id: str, decision_params: dict @@ -176,7 +176,7 @@ def set_request_decision( path_params={"request_id": request_id}, body=decision_params, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def revoke_request_target_role(self, request_id: str) -> PrivXAPIResponse: """ @@ -190,7 +190,7 @@ def revoke_request_target_role(self, request_id: str) -> PrivXAPIResponse: UrlEnum.WORKFLOW_ENGINE.ROLE_REVOKE, path_params={"request_id": request_id}, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def search_requests( self, @@ -219,7 +219,7 @@ def search_requests( query_params=search_params, body=request_param, ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def get_workflow_engine_settings(self) -> PrivXAPIResponse: """ @@ -229,7 +229,7 @@ def get_workflow_engine_settings(self) -> PrivXAPIResponse: PrivXAPIResponse """ response_status, data = self._http_get(UrlEnum.WORKFLOW_ENGINE.SETTINGS) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def update_workflow_engine_settings(self, settings_param: dict) -> PrivXAPIResponse: """ @@ -241,7 +241,7 @@ def update_workflow_engine_settings(self, settings_param: dict) -> PrivXAPIRespo response_status, data = self._http_put( UrlEnum.WORKFLOW_ENGINE.SETTINGS, body=settings_param ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) def test_workflow_engine_settings(self, settings_param: dict) -> PrivXAPIResponse: """ @@ -253,4 +253,4 @@ def test_workflow_engine_settings(self, settings_param: dict) -> PrivXAPIRespons response_status, data = self._http_post( UrlEnum.WORKFLOW_ENGINE.TEST_SETTINGS, body=settings_param ) - return PrivXAPIResponse(response_status, HTTPStatus.OK, data) + return self._api_response(response_status, HTTPStatus.OK, data) diff --git a/setup.py b/setup.py index cd7919f..e0b22b1 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ setup( name="privx_api", - version="42.0.0", + version="42.0.1", packages=["privx_api"], license="Apache Licence 2.0", url="https://github.com/SSHcom/privx-sdk-for-python",