From 81067960c19b8e79fcfbe458436175aa6eff9d1e Mon Sep 17 00:00:00 2001 From: Andrii Boiko Date: Tue, 20 Jan 2026 07:45:40 +0100 Subject: [PATCH 01/13] feat: add FPNV --- firebase_admin/fpnv.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 firebase_admin/fpnv.py diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py new file mode 100644 index 00000000..21fd49cb --- /dev/null +++ b/firebase_admin/fpnv.py @@ -0,0 +1,19 @@ +# Copyright 2026 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Firebase Phone Number Verification (FPNV) service. + +This module contains functions for verifying JWTs used for +authenticating against Firebase services. +""" From b6d06899b728dc5ec2bac1507f48b708ac1ac357 Mon Sep 17 00:00:00 2001 From: Andrii Boiko Date: Tue, 20 Jan 2026 16:11:27 +0100 Subject: [PATCH 02/13] feat: add basic logic --- firebase_admin/fpnv.py | 216 ++++++++++++++++++++++++++++++++++++++++- tests/test_fpnv.py | 18 ++++ 2 files changed, 230 insertions(+), 4 deletions(-) create mode 100644 tests/test_fpnv.py diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py index 21fd49cb..4a212288 100644 --- a/firebase_admin/fpnv.py +++ b/firebase_admin/fpnv.py @@ -12,8 +12,216 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Firebase Phone Number Verification (FPNV) service. +"""Firebase Phone Number Verification (FPNV) module.""" +from typing import Any, Dict -This module contains functions for verifying JWTs used for -authenticating against Firebase services. -""" +import jwt +from jwt import PyJWKClient, InvalidTokenError, DecodeError, InvalidSignatureError, \ + InvalidAudienceError, InvalidIssuerError, ExpiredSignatureError + +from firebase_admin import _utils +from firebase_admin.exceptions import InvalidArgumentError + +_FPNV_ATTRIBUTE = '_fpnv' +_FPNV_JWKS_URL = 'https://fpnv.googleapis.com/v1beta/jwks' +_FPNV_ISSUER = 'https://fpnv.googleapis.com/projects/' +_ALGORITHM_ES256 = 'ES256' + + +def client(app=None): + """Returns an instance of the FPNV service for the specified app. + + Args: + app: An App instance (optional). + + Returns: + FpnvClient: A FpnvClient instance. + + Raises: + ValueError: If the app is not a valid App instance. + """ + return _utils.get_app_service(app, _FPNV_ATTRIBUTE, FpnvClient) + + +class FpnvToken(dict): + """Represents a verified FPNV token. + + This class behaves like a dictionary, allowing access to the decoded claims. + It also provides convenience properties for common claims. + """ + + def __init__(self, claims): + super(FpnvToken, self).__init__(claims) + + @property + def phone_number(self): + """Returns the phone number associated with the token.""" + return self.get('sub') + + @property + def issuer(self): + """Returns the issuer of the token.""" + return self.get('iss') + + @property + def audience(self): + """Returns the audience of the token.""" + return self.get('aud') + + @property + def sub(self): + """Returns the sub (subject) of the token, which is the phone number.""" + return self.get('sub') + + # TODO: ADD ALL + + +class FpnvClient: + """The client for the Firebase Phone Number Verification service.""" + _project_id = None + + def __init__(self, app): + """Initializes the FpnvClient. + + Args: + app: A firebase_admin.App instance. + + Raises: + ValueError: If the app is invalid or lacks a project ID. + """ + self._project_id = app.project_id + + if not self._project_id: + cred = app.credential.get_credential() + if hasattr(cred, 'project_id'): + self._project_id = cred.project_id + + if not self._project_id: + raise ValueError( + 'Project ID is required for FPNV. Please ensure the app is ' + 'initialized with a credential that contains a project ID.' + ) + + self._verifier = _FpnvTokenVerifier(self._project_id) + + def verify_token(self, token) -> FpnvToken: + """Verifies the given FPNV token. + + Verifies the signature, expiration, and claims of the token. + + Args: + token: A string containing the FPNV JWT. + + Returns: + FpnvToken: The verified token claims. + + Raises: + ValueError: If the token is invalid or malformed. + firebase_admin.exceptions.InvalidArgumentError: If verification fails. + """ + try: + claims = self._verifier.verify(token) + return FpnvToken(claims) + except Exception as error: + raise InvalidArgumentError( + 'Failed to verify token: {0}'.format(error) + ) + + +class _FpnvTokenVerifier: + """Internal class for verifying FPNV JWTs signed with ES256.""" + _jwks_client = None + _project_id = None + + def __init__(self, project_id): + self._project_id = project_id + self._jwks_client = PyJWKClient(_FPNV_JWKS_URL, lifespan=21600) + + def verify(self, token) -> Dict[str, Any]: + _Validators.check_string("FPNV check token", token) + try: + self._validate_headers(jwt.get_unverified_header(token)) + signing_key = self._jwks_client.get_signing_key_from_jwt(token) + claims = self._validate_payload(token, signing_key.key) + except (InvalidTokenError, DecodeError) as exception: + raise ValueError( + f'Verifying FPNV token failed. Error: {exception}' + ) from exception + + return claims + + def _validate_headers(self, headers: Any) -> None: + if headers.get('kid') is None: + raise ValueError("FPNV has no 'kid' claim.") + + if headers.get('typ') != 'JWT': + raise ValueError("The provided FPNV token has an incorrect type header") + + algorithm = headers.get('alg') + if algorithm != _ALGORITHM_ES256: + raise ValueError( + 'The provided FPNV token has an incorrect alg header. ' + f'Expected {_ALGORITHM_ES256} but got {algorithm}.' + ) + + def _validate_payload(self, token: str, signing_key: str) -> Dict[str, Any]: + """Decodes and verifies the token.""" + _issuer = None + payload = {} + try: + unsafe_payload = jwt.decode(token, options={"verify_signature": False}) + _issuer = unsafe_payload.get('iss') + + if _issuer is None: + raise ValueError('The provided FPNV token has no issuer.') + payload = jwt.decode( + token, + signing_key, + algorithms=[_ALGORITHM_ES256], + audience=_issuer + ) + except InvalidSignatureError as exception: + raise ValueError( + 'The provided FPNV token has an invalid signature.' + ) from exception + except InvalidAudienceError as exception: + raise ValueError( + 'The provided FPNV token has an incorrect "aud" (audience) claim. ' + f'Expected payload to include {_issuer}.' + ) from exception + except InvalidIssuerError as exception: + raise ValueError( + 'The provided FPNV token has an incorrect "iss" (issuer) claim. ' + f'Expected claim to include {_issuer}' + ) from exception + except ExpiredSignatureError as exception: + raise ValueError( + 'The provided FPNV token has expired.' + ) from exception + except InvalidTokenError as exception: + raise ValueError( + f'Decoding FPNV token failed. Error: {exception}' + ) from exception + + if not payload.get('iss').startswith(_FPNV_ISSUER): + raise ValueError('Token does not contain the correct "iss" (issuer).') + _Validators.check_string( + 'The provided FPNV token "sub" (subject) claim', + payload.get('sub')) + + return payload + + +class _Validators: + """A collection of data validation utilities. + + Methods provided in this class raise ``ValueErrors`` if any validations fail. + """ + + @classmethod + def check_string(cls, label: str, value: Any): + """Checks if the given value is a string.""" + if value is None: + raise ValueError(f'{label} "{value}" must be a non-empty string.') + if not isinstance(value, str): + raise ValueError(f'{label} "{value}" must be a string.') diff --git a/tests/test_fpnv.py b/tests/test_fpnv.py new file mode 100644 index 00000000..818238a4 --- /dev/null +++ b/tests/test_fpnv.py @@ -0,0 +1,18 @@ +# Copyright 2026 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test cases for the firebase_admin.fpnv module.""" + +class TestVerifyToken: + pass \ No newline at end of file From c22cb8c0ff36dfe8eadf40e2a6d51f6b2f158d28 Mon Sep 17 00:00:00 2001 From: Andrii Boiko Date: Wed, 21 Jan 2026 12:53:26 +0100 Subject: [PATCH 03/13] chore: resolve robot comments --- firebase_admin/fpnv.py | 53 ++++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py index 4a212288..a2ca7c02 100644 --- a/firebase_admin/fpnv.py +++ b/firebase_admin/fpnv.py @@ -12,7 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Firebase Phone Number Verification (FPNV) module.""" +"""Firebase Phone Number Verification (FPNV) module. + +This module contains functions for verifying JWTs related to the Firebase +Phone Number Verification (FPNV) service. +""" from typing import Any, Dict import jwt @@ -55,25 +59,40 @@ def __init__(self, claims): @property def phone_number(self): - """Returns the phone number associated with the token.""" + """Returns the phone number of the user. + This corresponds to the 'sub' claim in the JWT. + """ return self.get('sub') @property def issuer(self): - """Returns the issuer of the token.""" + """Returns the issuer identifier for the issuer of the response.""" return self.get('iss') @property def audience(self): - """Returns the audience of the token.""" + """Returns the audience for which this token is intended.""" return self.get('aud') + @property + def exp(self): + """Returns the expiration time since the Unix epoch.""" + return self.get('exp') + + @property + def iat(self): + """Returns the issued-at time since the Unix epoch.""" + return self.get('iat') + @property def sub(self): """Returns the sub (subject) of the token, which is the phone number.""" return self.get('sub') - # TODO: ADD ALL + @property + def claims(self): + """Returns the entire map of claims.""" + return self class FpnvClient: @@ -122,9 +141,9 @@ def verify_token(self, token) -> FpnvToken: try: claims = self._verifier.verify(token) return FpnvToken(claims) - except Exception as error: + except ValueError as error: raise InvalidArgumentError( - 'Failed to verify token: {0}'.format(error) + 'Failed to verify token: {0}'.format(error), cause=error ) @@ -166,19 +185,15 @@ def _validate_headers(self, headers: Any) -> None: def _validate_payload(self, token: str, signing_key: str) -> Dict[str, Any]: """Decodes and verifies the token.""" - _issuer = None + expected_issuer = f'{_FPNV_ISSUER}{self._project_id}' payload = {} try: - unsafe_payload = jwt.decode(token, options={"verify_signature": False}) - _issuer = unsafe_payload.get('iss') - - if _issuer is None: - raise ValueError('The provided FPNV token has no issuer.') payload = jwt.decode( token, signing_key, algorithms=[_ALGORITHM_ES256], - audience=_issuer + audience=expected_issuer, + issuer=expected_issuer ) except InvalidSignatureError as exception: raise ValueError( @@ -187,12 +202,12 @@ def _validate_payload(self, token: str, signing_key: str) -> Dict[str, Any]: except InvalidAudienceError as exception: raise ValueError( 'The provided FPNV token has an incorrect "aud" (audience) claim. ' - f'Expected payload to include {_issuer}.' + f'Expected payload to include {expected_issuer}.' ) from exception except InvalidIssuerError as exception: raise ValueError( 'The provided FPNV token has an incorrect "iss" (issuer) claim. ' - f'Expected claim to include {_issuer}' + f'Expected claim to include {expected_issuer}' ) from exception except ExpiredSignatureError as exception: raise ValueError( @@ -221,7 +236,5 @@ class _Validators: @classmethod def check_string(cls, label: str, value: Any): """Checks if the given value is a string.""" - if value is None: - raise ValueError(f'{label} "{value}" must be a non-empty string.') - if not isinstance(value, str): - raise ValueError(f'{label} "{value}" must be a string.') + if not isinstance(value, str) or not value: + raise ValueError(f'{label} must be a non-empty string.') From 7ebc65273e7fd10fefd3f8b612caf778940bd0e6 Mon Sep 17 00:00:00 2001 From: Andrii Boiko Date: Wed, 21 Jan 2026 14:09:59 +0100 Subject: [PATCH 04/13] chore: add tests --- tests/test_fpnv.py | 197 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 196 insertions(+), 1 deletion(-) diff --git a/tests/test_fpnv.py b/tests/test_fpnv.py index 818238a4..3dc2bebc 100644 --- a/tests/test_fpnv.py +++ b/tests/test_fpnv.py @@ -14,5 +14,200 @@ """Test cases for the firebase_admin.fpnv module.""" +import json +from datetime import time + +import jwt +import pytest +from unittest import mock + +import firebase_admin +from firebase_admin import fpnv +from firebase_admin import _utils +from tests import testutils + +# Mock Data +_PROJECT_ID = 'mock-project-id' +_FPNV_TOKEN = 'fpnv_token_string' +_EXP_TIMESTAMP = 2000000000 +_ISSUER = f'https://fpnv.googleapis.com/projects/{_PROJECT_ID}' +_JWKS_URL = 'https://fpnv.googleapis.com/v1beta/jwks' +_PHONE_NUMBER = '+1234567890' +_ISSUER_PREFIX = 'https://fpnv.googleapis.com/projects/' +_PRIVATE_KEY = 'test-private-key' # In real tests, use a real RSA/EC private key +_PUBLIC_KEY = 'test-public-key' # In real tests, use the corresponding public key + +_MOCK_PAYLOAD = { + 'iss': _ISSUER, + 'sub': '+1234567890', + 'aud': [_ISSUER], + 'exp': _EXP_TIMESTAMP, + 'iat': _EXP_TIMESTAMP - 3600, + "other": 'other' +} + + +@pytest.fixture +def app(): + cred = testutils.MockCredential() + return firebase_admin.initialize_app(cred, {'projectId': _PROJECT_ID}) + + +@pytest.fixture +def client(app): + return fpnv.client(app) + + +class TestCommon: + @classmethod + def teardown_class(cls): + testutils.cleanup_apps() + + +class TestFpnvToken(TestCommon): + def test_properties(self): + token = fpnv.FpnvToken(_MOCK_PAYLOAD) + + assert token.phone_number == _PHONE_NUMBER + assert token.sub == _PHONE_NUMBER + assert token.issuer == _ISSUER + assert token.audience == [_ISSUER] + assert token.exp == _MOCK_PAYLOAD['exp'] + assert token.iat == _MOCK_PAYLOAD['iat'] + assert token.claims == _MOCK_PAYLOAD + assert token['other'] == _MOCK_PAYLOAD['other'] + + +class TestFpnvClient(TestCommon): + + def test_client_no_app(self): + with mock.patch('firebase_admin._utils.get_app_service') as mock_get_service: + fpnv.client() + mock_get_service.assert_called_once() + with pytest.raises(ValueError): + fpnv.client() + + def test_client(self, app): + client = fpnv.client(app) + assert isinstance(client, fpnv.FpnvClient) + assert client._project_id == _PROJECT_ID + + def test_requires_project_id(self): + cred = testutils.MockCredential() + # Create app without project ID + app = firebase_admin.initialize_app(cred, name='no_project_id') + # Mock credential to not have project_id + app.credential.get_credential().project_id = None + + with pytest.raises(ValueError, match='Project ID is required'): + fpnv.client(app) + + def test_client_default_app(self): + client = fpnv.client() + assert isinstance(client, fpnv.FpnvClient) + + def test_client_explicit_app(self): + cred = testutils.MockCredential() + app = firebase_admin.initialize_app(cred, {'projectId': _PROJECT_ID}, name='custom') + client = fpnv.client(app) + assert isinstance(client, fpnv.FpnvClient) + + class TestVerifyToken: - pass \ No newline at end of file + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_success(self, mock_header, mock_decode, mock_jwks_cls, client): + token_str = 'valid.token.string' + # Mock Header + mock_header.return_value = {'kid': 'key1', 'typ': 'JWT', 'alg': 'ES256'} + + # Mock Signing Key + mock_jwks_instance = mock_jwks_cls.return_value + mock_signing_key = mock.Mock() + mock_signing_key.key = _PUBLIC_KEY + mock_jwks_instance.get_signing_key_from_jwt.return_value = mock_signing_key + + mock_decode.return_value = _MOCK_PAYLOAD + + # Execute + token = client.verify_token(token_str) + + # Verify + assert isinstance(token, fpnv.FpnvToken) + assert token.phone_number == _PHONE_NUMBER + + mock_header.assert_called_with(token_str) + mock_jwks_instance.get_signing_key_from_jwt.assert_called_with(token_str) + mock_decode.assert_called_with( + token_str, + _PUBLIC_KEY, + algorithms=['ES256'], + audience=_ISSUER, + issuer=_ISSUER + ) + + @mock.patch('jwt.get_unverified_header') + def test_verify_token_no_kid(self, mock_header, client): + mock_header.return_value = {'typ': 'JWT', 'alg': 'ES256'} # Missing kid + with pytest.raises(ValueError, match="no 'kid' claim"): + client.verify_token('token') + + @mock.patch('jwt.get_unverified_header') + def test_verify_token_wrong_alg(self, mock_header, client): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'RS256'} # Wrong alg + with pytest.raises(ValueError, match="incorrect alg"): + client.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_jwk_error(self, mock_header, mock_jwks_cls, client): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + # Simulate Key not found or other PyJWKClient error + mock_jwks_instance.get_signing_key_from_jwt.side_effect = jwt.PyJWKClientError("Key not found") + + with pytest.raises(ValueError, match="Verifying FPNV token failed"): + client.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_expired(self, mock_header, mock_decode, mock_jwks_cls, client): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + + # Simulate ExpiredSignatureError + mock_decode.side_effect = jwt.ExpiredSignatureError("Expired") + + with pytest.raises(ValueError, match="token has expired"): + client.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_invalid_audience(self, mock_header, mock_decode, mock_jwks_cls, client): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + + # Simulate InvalidAudienceError + mock_decode.side_effect = jwt.InvalidAudienceError("Wrong Aud") + + with pytest.raises(ValueError, match="incorrect \"aud\""): + client.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_invalid_issuer(self, mock_header, mock_decode, mock_jwks_cls, client): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + + # Simulate InvalidIssuerError + mock_decode.side_effect = jwt.InvalidIssuerError("Wrong Iss") + + with pytest.raises(ValueError, match="incorrect \"iss\""): + client.verify_token('token') From 03353e9828c1a8f2e150287c9a5a961fbdb673eb Mon Sep 17 00:00:00 2001 From: Andrii Boiko Date: Wed, 21 Jan 2026 15:57:20 +0100 Subject: [PATCH 05/13] chore: update tests --- tests/test_fpnv.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/test_fpnv.py b/tests/test_fpnv.py index 3dc2bebc..ea8c358c 100644 --- a/tests/test_fpnv.py +++ b/tests/test_fpnv.py @@ -14,16 +14,13 @@ """Test cases for the firebase_admin.fpnv module.""" -import json -from datetime import time +from unittest import mock import jwt import pytest -from unittest import mock import firebase_admin from firebase_admin import fpnv -from firebase_admin import _utils from tests import testutils # Mock Data @@ -114,6 +111,7 @@ def test_client_explicit_app(self): class TestVerifyToken: + @mock.patch('jwt.PyJWKClient') @mock.patch('jwt.decode') @mock.patch('jwt.get_unverified_header') @@ -165,7 +163,8 @@ def test_verify_token_jwk_error(self, mock_header, mock_jwks_cls, client): mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} mock_jwks_instance = mock_jwks_cls.return_value # Simulate Key not found or other PyJWKClient error - mock_jwks_instance.get_signing_key_from_jwt.side_effect = jwt.PyJWKClientError("Key not found") + mock_jwks_instance.get_signing_key_from_jwt.side_effect = jwt.PyJWKClientError( + "Key not found") with pytest.raises(ValueError, match="Verifying FPNV token failed"): client.verify_token('token') From 839a4971036596ff00a4ad0b557824fb73f0da80 Mon Sep 17 00:00:00 2001 From: Andrii Boiko Date: Wed, 21 Jan 2026 17:13:17 +0100 Subject: [PATCH 06/13] chore: update tests --- firebase_admin/fpnv.py | 11 ++--------- tests/test_fpnv.py | 44 ++++++++++++++++++++++-------------------- 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py index a2ca7c02..7be5547c 100644 --- a/firebase_admin/fpnv.py +++ b/firebase_admin/fpnv.py @@ -136,15 +136,8 @@ def verify_token(self, token) -> FpnvToken: Raises: ValueError: If the token is invalid or malformed. - firebase_admin.exceptions.InvalidArgumentError: If verification fails. """ - try: - claims = self._verifier.verify(token) - return FpnvToken(claims) - except ValueError as error: - raise InvalidArgumentError( - 'Failed to verify token: {0}'.format(error), cause=error - ) + return FpnvToken(self._verifier.verify(token)) class _FpnvTokenVerifier: @@ -237,4 +230,4 @@ class _Validators: def check_string(cls, label: str, value: Any): """Checks if the given value is a string.""" if not isinstance(value, str) or not value: - raise ValueError(f'{label} must be a non-empty string.') + raise ValueError(f'{label} must be a non-empty string.') \ No newline at end of file diff --git a/tests/test_fpnv.py b/tests/test_fpnv.py index ea8c358c..24eb78a4 100644 --- a/tests/test_fpnv.py +++ b/tests/test_fpnv.py @@ -14,6 +14,7 @@ """Test cases for the firebase_admin.fpnv module.""" +import base64 from unittest import mock import jwt @@ -43,25 +44,25 @@ "other": 'other' } - -@pytest.fixture -def app(): - cred = testutils.MockCredential() - return firebase_admin.initialize_app(cred, {'projectId': _PROJECT_ID}) - - @pytest.fixture -def client(app): +def client(): + app = firebase_admin.get_app() return fpnv.client(app) class TestCommon: + @classmethod + def setup_class(cls): + cred = testutils.MockCredential() + firebase_admin.initialize_app(cred, {'projectId': _PROJECT_ID}) + + @classmethod def teardown_class(cls): testutils.cleanup_apps() -class TestFpnvToken(TestCommon): +class TestFpnvToken: def test_properties(self): token = fpnv.FpnvToken(_MOCK_PAYLOAD) @@ -77,14 +78,8 @@ def test_properties(self): class TestFpnvClient(TestCommon): - def test_client_no_app(self): - with mock.patch('firebase_admin._utils.get_app_service') as mock_get_service: - fpnv.client() - mock_get_service.assert_called_once() - with pytest.raises(ValueError): - fpnv.client() - - def test_client(self, app): + def test_client(self): + app = firebase_admin.get_app() client = fpnv.client(app) assert isinstance(client, fpnv.FpnvClient) assert client._project_id == _PROJECT_ID @@ -110,7 +105,7 @@ def test_client_explicit_app(self): assert isinstance(client, fpnv.FpnvClient) -class TestVerifyToken: +class TestVerifyToken(TestCommon): @mock.patch('jwt.PyJWKClient') @mock.patch('jwt.decode') @@ -125,6 +120,7 @@ def test_verify_token_success(self, mock_header, mock_decode, mock_jwks_cls, cli mock_signing_key = mock.Mock() mock_signing_key.key = _PUBLIC_KEY mock_jwks_instance.get_signing_key_from_jwt.return_value = mock_signing_key + client._verifier._jwks_client = mock_jwks_instance mock_decode.return_value = _MOCK_PAYLOAD @@ -146,9 +142,11 @@ def test_verify_token_success(self, mock_header, mock_decode, mock_jwks_cls, cli ) @mock.patch('jwt.get_unverified_header') - def test_verify_token_no_kid(self, mock_header, client): + def test_verify_token_no_kid(self, mock_header): + app = firebase_admin.get_app() + client = fpnv.client(app) mock_header.return_value = {'typ': 'JWT', 'alg': 'ES256'} # Missing kid - with pytest.raises(ValueError, match="no 'kid' claim"): + with pytest.raises(ValueError, match="FPNV has no 'kid' claim."): client.verify_token('token') @mock.patch('jwt.get_unverified_header') @@ -176,6 +174,8 @@ def test_verify_token_expired(self, mock_header, mock_decode, mock_jwks_cls, cli mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} mock_jwks_instance = mock_jwks_cls.return_value mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + client._verifier._jwks_client = mock_jwks_instance + # Simulate ExpiredSignatureError mock_decode.side_effect = jwt.ExpiredSignatureError("Expired") @@ -190,6 +190,7 @@ def test_verify_token_invalid_audience(self, mock_header, mock_decode, mock_jwks mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} mock_jwks_instance = mock_jwks_cls.return_value mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + client._verifier._jwks_client = mock_jwks_instance # Simulate InvalidAudienceError mock_decode.side_effect = jwt.InvalidAudienceError("Wrong Aud") @@ -204,9 +205,10 @@ def test_verify_token_invalid_issuer(self, mock_header, mock_decode, mock_jwks_c mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} mock_jwks_instance = mock_jwks_cls.return_value mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + client._verifier._jwks_client = mock_jwks_instance # Simulate InvalidIssuerError mock_decode.side_effect = jwt.InvalidIssuerError("Wrong Iss") with pytest.raises(ValueError, match="incorrect \"iss\""): - client.verify_token('token') + client.verify_token('token') \ No newline at end of file From 966b31354c49752e42c1dd4b041d5f7234043e16 Mon Sep 17 00:00:00 2001 From: Andrii Boiko Date: Wed, 21 Jan 2026 17:55:49 +0100 Subject: [PATCH 07/13] chore: update tests --- firebase_admin/fpnv.py | 7 +++---- tests/test_fpnv.py | 30 ++++++++++++++++-------------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py index 7be5547c..2bfc7df9 100644 --- a/firebase_admin/fpnv.py +++ b/firebase_admin/fpnv.py @@ -21,10 +21,9 @@ import jwt from jwt import PyJWKClient, InvalidTokenError, DecodeError, InvalidSignatureError, \ - InvalidAudienceError, InvalidIssuerError, ExpiredSignatureError + PyJWKClientError, InvalidAudienceError, InvalidIssuerError, ExpiredSignatureError from firebase_admin import _utils -from firebase_admin.exceptions import InvalidArgumentError _FPNV_ATTRIBUTE = '_fpnv' _FPNV_JWKS_URL = 'https://fpnv.googleapis.com/v1beta/jwks' @@ -155,7 +154,7 @@ def verify(self, token) -> Dict[str, Any]: self._validate_headers(jwt.get_unverified_header(token)) signing_key = self._jwks_client.get_signing_key_from_jwt(token) claims = self._validate_payload(token, signing_key.key) - except (InvalidTokenError, DecodeError) as exception: + except (InvalidTokenError, DecodeError, PyJWKClientError) as exception: raise ValueError( f'Verifying FPNV token failed. Error: {exception}' ) from exception @@ -230,4 +229,4 @@ class _Validators: def check_string(cls, label: str, value: Any): """Checks if the given value is a string.""" if not isinstance(value, str) or not value: - raise ValueError(f'{label} must be a non-empty string.') \ No newline at end of file + raise ValueError(f'{label} must be a non-empty string.') diff --git a/tests/test_fpnv.py b/tests/test_fpnv.py index 24eb78a4..4dbd57cc 100644 --- a/tests/test_fpnv.py +++ b/tests/test_fpnv.py @@ -14,7 +14,6 @@ """Test cases for the firebase_admin.fpnv module.""" -import base64 from unittest import mock import jwt @@ -44,6 +43,7 @@ "other": 'other' } + @pytest.fixture def client(): app = firebase_admin.get_app() @@ -56,7 +56,6 @@ def setup_class(cls): cred = testutils.MockCredential() firebase_admin.initialize_app(cred, {'projectId': _PROJECT_ID}) - @classmethod def teardown_class(cls): testutils.cleanup_apps() @@ -155,17 +154,21 @@ def test_verify_token_wrong_alg(self, mock_header, client): with pytest.raises(ValueError, match="incorrect alg"): client.verify_token('token') - @mock.patch('jwt.PyJWKClient') - @mock.patch('jwt.get_unverified_header') - def test_verify_token_jwk_error(self, mock_header, mock_jwks_cls, client): - mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} - mock_jwks_instance = mock_jwks_cls.return_value - # Simulate Key not found or other PyJWKClient error - mock_jwks_instance.get_signing_key_from_jwt.side_effect = jwt.PyJWKClientError( - "Key not found") + def test_verify_token_jwk_error(self, client): + # Access the ACTUAL client instance used by the verifier + # (Assuming internal structure: client -> _verifier -> _jwks_client) + jwks_client = client._verifier._jwks_client - with pytest.raises(ValueError, match="Verifying FPNV token failed"): - client.verify_token('token') + # Mock the method on the existing instance + with mock.patch.object(jwks_client, 'get_signing_key_from_jwt') as mock_method: + mock_method.side_effect = jwt.PyJWKClientError("Key not found") + + # Mock header is still needed if _get_signing_key calls it before the client + with mock.patch('jwt.get_unverified_header') as mock_header: + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + + with pytest.raises(ValueError, match="Verifying FPNV token failed"): + client.verify_token('token') @mock.patch('jwt.PyJWKClient') @mock.patch('jwt.decode') @@ -176,7 +179,6 @@ def test_verify_token_expired(self, mock_header, mock_decode, mock_jwks_cls, cli mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY client._verifier._jwks_client = mock_jwks_instance - # Simulate ExpiredSignatureError mock_decode.side_effect = jwt.ExpiredSignatureError("Expired") @@ -211,4 +213,4 @@ def test_verify_token_invalid_issuer(self, mock_header, mock_decode, mock_jwks_c mock_decode.side_effect = jwt.InvalidIssuerError("Wrong Iss") with pytest.raises(ValueError, match="incorrect \"iss\""): - client.verify_token('token') \ No newline at end of file + client.verify_token('token') From b7240995b2f2ada80cc40f2fbaa884d9ad354eb3 Mon Sep 17 00:00:00 2001 From: Andrii Boiko Date: Wed, 21 Jan 2026 18:23:29 +0100 Subject: [PATCH 08/13] chore: resolve comments --- firebase_admin/fpnv.py | 3 --- tests/test_fpnv.py | 4 ---- 2 files changed, 7 deletions(-) diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py index 2bfc7df9..d0374859 100644 --- a/firebase_admin/fpnv.py +++ b/firebase_admin/fpnv.py @@ -178,7 +178,6 @@ def _validate_headers(self, headers: Any) -> None: def _validate_payload(self, token: str, signing_key: str) -> Dict[str, Any]: """Decodes and verifies the token.""" expected_issuer = f'{_FPNV_ISSUER}{self._project_id}' - payload = {} try: payload = jwt.decode( token, @@ -210,8 +209,6 @@ def _validate_payload(self, token: str, signing_key: str) -> Dict[str, Any]: f'Decoding FPNV token failed. Error: {exception}' ) from exception - if not payload.get('iss').startswith(_FPNV_ISSUER): - raise ValueError('Token does not contain the correct "iss" (issuer).') _Validators.check_string( 'The provided FPNV token "sub" (subject) claim', payload.get('sub')) diff --git a/tests/test_fpnv.py b/tests/test_fpnv.py index 4dbd57cc..21e85c23 100644 --- a/tests/test_fpnv.py +++ b/tests/test_fpnv.py @@ -25,13 +25,9 @@ # Mock Data _PROJECT_ID = 'mock-project-id' -_FPNV_TOKEN = 'fpnv_token_string' _EXP_TIMESTAMP = 2000000000 _ISSUER = f'https://fpnv.googleapis.com/projects/{_PROJECT_ID}' -_JWKS_URL = 'https://fpnv.googleapis.com/v1beta/jwks' _PHONE_NUMBER = '+1234567890' -_ISSUER_PREFIX = 'https://fpnv.googleapis.com/projects/' -_PRIVATE_KEY = 'test-private-key' # In real tests, use a real RSA/EC private key _PUBLIC_KEY = 'test-public-key' # In real tests, use the corresponding public key _MOCK_PAYLOAD = { From 61e10e50b12393e94095424044b2196482e967ec Mon Sep 17 00:00:00 2001 From: Andrii Boiko Date: Fri, 23 Jan 2026 10:23:46 +0100 Subject: [PATCH 09/13] chore: resolve lint and add more tests --- firebase_admin/fpnv.py | 4 ++- tests/test_fpnv.py | 64 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py index d0374859..5ca3ba99 100644 --- a/firebase_admin/fpnv.py +++ b/firebase_admin/fpnv.py @@ -54,7 +54,7 @@ class FpnvToken(dict): """ def __init__(self, claims): - super(FpnvToken, self).__init__(claims) + super().__init__(claims) @property def phone_number(self): @@ -149,6 +149,7 @@ def __init__(self, project_id): self._jwks_client = PyJWKClient(_FPNV_JWKS_URL, lifespan=21600) def verify(self, token) -> Dict[str, Any]: + """Verifies the given FPNV token.""" _Validators.check_string("FPNV check token", token) try: self._validate_headers(jwt.get_unverified_header(token)) @@ -162,6 +163,7 @@ def verify(self, token) -> Dict[str, Any]: return claims def _validate_headers(self, headers: Any) -> None: + """Validates the headers.""" if headers.get('kid') is None: raise ValueError("FPNV has no 'kid' claim.") diff --git a/tests/test_fpnv.py b/tests/test_fpnv.py index 21e85c23..2013beb2 100644 --- a/tests/test_fpnv.py +++ b/tests/test_fpnv.py @@ -14,10 +14,14 @@ """Test cases for the firebase_admin.fpnv module.""" +import base64 +import time from unittest import mock +from unittest.mock import patch import jwt import pytest +from cryptography.hazmat.primitives.asymmetric import ec import firebase_admin from firebase_admin import fpnv @@ -29,10 +33,13 @@ _ISSUER = f'https://fpnv.googleapis.com/projects/{_PROJECT_ID}' _PHONE_NUMBER = '+1234567890' _PUBLIC_KEY = 'test-public-key' # In real tests, use the corresponding public key +_ALGORITHM = 'ES256' +_KEY_ID = 'test-key-id' +_TYPE = 'JWT' _MOCK_PAYLOAD = { 'iss': _ISSUER, - 'sub': '+1234567890', + 'sub': _PHONE_NUMBER, 'aud': [_ISSUER], 'exp': _EXP_TIMESTAMP, 'iat': _EXP_TIMESTAMP - 3600, @@ -102,6 +109,61 @@ def test_client_explicit_app(self): class TestVerifyToken(TestCommon): + def test_verify_token_with_real_crypto(self): + """Verifies a token signed with a real ES256 key pair. + + Mocking only the JWKS endpoint. + This ensures the cryptographic verification logic is functioning correctly. + """ + # Generate a real ES256 key pair + private_key = ec.generate_private_key(ec.SECP256R1()) + public_key = private_key.public_key() + + # Create the JWK representation of the public key (for the mock endpoint) + # Note: Retrieving numbers from the key involves cryptography primitives + public_numbers = public_key.public_numbers() + + def to_b64url(b_data): + return base64.urlsafe_b64encode(b_data).rstrip(b'=').decode('utf-8') + + jwk = { + "kty": "EC", + "use": "sig", + "alg": _ALGORITHM, + "kid": _KEY_ID, + "crv": "P-256", + "x": to_b64url(public_numbers.x.to_bytes(32, 'big')), + "y": to_b64url(public_numbers.y.to_bytes(32, 'big')), + } + now = int(time.time()) + payload = { + 'iss': _ISSUER, + 'aud': [_ISSUER], + 'iat': now, + 'exp': now + 3600, + 'sub': _PHONE_NUMBER + } + + # Sign using the private key object directly (PyJWT supports this) + token = jwt.encode( + payload, + private_key, + algorithm=_ALGORITHM, + headers={'alg': _ALGORITHM, 'typ': _TYPE, 'kid': _KEY_ID}, + ) + + # Mock PyJWKClient fetch_data + with patch('jwt.PyJWKClient.fetch_data') as mock_fetch: + mock_fetch.return_value = {'keys': [jwk]} + + app = firebase_admin.get_app() + client = fpnv.client(app) + decoded_token = client.verify_token(token) + + assert decoded_token['sub'] == _PHONE_NUMBER + assert _ISSUER in decoded_token['aud'] + assert decoded_token.phone_number == decoded_token['sub'] + @mock.patch('jwt.PyJWKClient') @mock.patch('jwt.decode') @mock.patch('jwt.get_unverified_header') From 1dee2e55a6977fce5eb32e36c4a5ef5fee17dbb5 Mon Sep 17 00:00:00 2001 From: tomaszjaniewicz-gl Date: Tue, 10 Feb 2026 16:29:50 +0100 Subject: [PATCH 10/13] add tests to test_fpnv.py to increase coverage to 100% --- tests/test_fpnv.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/test_fpnv.py b/tests/test_fpnv.py index 2013beb2..bd0e3e7e 100644 --- a/tests/test_fpnv.py +++ b/tests/test_fpnv.py @@ -106,6 +106,13 @@ def test_client_explicit_app(self): client = fpnv.client(app) assert isinstance(client, fpnv.FpnvClient) + def test_client_illegal_app_argument_wrong_app_type(self): + cred = testutils.MockCredential() + app = "" + with pytest.raises(ValueError, match = 'Illegal app argument. Argument must be of type firebase_admin.App, but given ' + f'"{type(app)}".'): + client = fpnv.client(app) + class TestVerifyToken(TestCommon): @@ -198,6 +205,14 @@ def test_verify_token_success(self, mock_header, mock_decode, mock_jwks_cls, cli issuer=_ISSUER ) + @mock.patch('jwt.get_unverified_header') + def test_verify_token_no_name(self, mock_header): + app = firebase_admin.get_app() + client = fpnv.client(app) + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + with pytest.raises(ValueError, match="must be a non-empty string"): + client.verify_token('') + @mock.patch('jwt.get_unverified_header') def test_verify_token_no_kid(self, mock_header): app = firebase_admin.get_app() @@ -212,6 +227,12 @@ def test_verify_token_wrong_alg(self, mock_header, client): with pytest.raises(ValueError, match="incorrect alg"): client.verify_token('token') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_wrong_typ(self, mock_header, client): + mock_header.return_value = {'kid': 'k', 'typ': 'WRONG', 'alg': 'ES256'} # wrong typ + with pytest.raises(ValueError, match="incorrect type header"): + client.verify_token('token') + def test_verify_token_jwk_error(self, client): # Access the ACTUAL client instance used by the verifier # (Assuming internal structure: client -> _verifier -> _jwks_client) @@ -243,6 +264,21 @@ def test_verify_token_expired(self, mock_header, mock_decode, mock_jwks_cls, cli with pytest.raises(ValueError, match="token has expired"): client.verify_token('token') + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_invalid_signature(self, mock_header, mock_decode, mock_jwks_cls, client): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + client._verifier._jwks_client = mock_jwks_instance + + # Simulate InvalidSignatureError + mock_decode.side_effect = jwt.InvalidSignatureError("Wrong Signature") + + with pytest.raises(ValueError, match="invalid signature"): + client.verify_token('token') + @mock.patch('jwt.PyJWKClient') @mock.patch('jwt.decode') @mock.patch('jwt.get_unverified_header') @@ -272,3 +308,18 @@ def test_verify_token_invalid_issuer(self, mock_header, mock_decode, mock_jwks_c with pytest.raises(ValueError, match="incorrect \"iss\""): client.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_invalid_token(self, mock_header, mock_decode, mock_jwks_cls, client): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + client._verifier._jwks_client = mock_jwks_instance + + # Simulate InvalidTokenError + mock_decode.side_effect = jwt.InvalidTokenError("Decoding FPNV token failed") + + with pytest.raises(ValueError, match="incorrect \"iss\""): + client.verify_token('token') \ No newline at end of file From e25c7de5cd78ec1618fc0578fbc98d54cefae0b5 Mon Sep 17 00:00:00 2001 From: jonathanedey Date: Mon, 23 Mar 2026 15:51:30 -0400 Subject: [PATCH 11/13] fix: Modified api usage to match approved specs --- firebase_admin/fpnv.py | 88 +++++++++++++++---------- tests/test_fpnv.py | 146 +++++++++++++++++------------------------ 2 files changed, 114 insertions(+), 120 deletions(-) diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py index 5ca3ba99..28318700 100644 --- a/firebase_admin/fpnv.py +++ b/firebase_admin/fpnv.py @@ -17,13 +17,16 @@ This module contains functions for verifying JWTs related to the Firebase Phone Number Verification (FPNV) service. """ -from typing import Any, Dict +from __future__ import annotations +from typing import Any, Dict, Optional import jwt -from jwt import PyJWKClient, InvalidTokenError, DecodeError, InvalidSignatureError, \ +from jwt import ( + PyJWKClient, InvalidTokenError, DecodeError, InvalidSignatureError, PyJWKClientError, InvalidAudienceError, InvalidIssuerError, ExpiredSignatureError +) -from firebase_admin import _utils +from firebase_admin import App, _utils, exceptions _FPNV_ATTRIBUTE = '_fpnv' _FPNV_JWKS_URL = 'https://fpnv.googleapis.com/v1beta/jwks' @@ -31,19 +34,24 @@ _ALGORITHM_ES256 = 'ES256' -def client(app=None): - """Returns an instance of the FPNV service for the specified app. +def _get_fpnv_service(app): + return _utils.get_app_service(app, _FPNV_ATTRIBUTE, _FpnvService) + +def verify_token(token: str, app: Optional[App] = None) -> FpnvToken: + """Verifies a Firebase Phone Number Verification (FPNV) token. Args: + token: A string containing the FPNV JWT. app: An App instance (optional). Returns: - FpnvClient: A FpnvClient instance. + FpnvToken: The verified token claims. Raises: - ValueError: If the app is not a valid App instance. + InvalidFpnvTokenError: If the token is invalid or malformed. + ExpiredFpnvTokenError: If the token has expired. """ - return _utils.get_app_service(app, _FPNV_ATTRIBUTE, FpnvClient) + return _get_fpnv_service(app).verify_token(token) class FpnvToken(dict): @@ -55,36 +63,37 @@ class FpnvToken(dict): def __init__(self, claims): super().__init__(claims) + self['phone_number'] = claims.get('sub') @property - def phone_number(self): + def phone_number(self) -> str: """Returns the phone number of the user. This corresponds to the 'sub' claim in the JWT. """ return self.get('sub') @property - def issuer(self): + def issuer(self) -> str: """Returns the issuer identifier for the issuer of the response.""" return self.get('iss') @property - def audience(self): + def audience(self) -> str: """Returns the audience for which this token is intended.""" return self.get('aud') @property - def exp(self): + def exp(self) -> int: """Returns the expiration time since the Unix epoch.""" return self.get('exp') @property - def iat(self): + def iat(self) -> int: """Returns the issued-at time since the Unix epoch.""" return self.get('iat') @property - def sub(self): + def sub(self) -> str: """Returns the sub (subject) of the token, which is the phone number.""" return self.get('sub') @@ -94,19 +103,11 @@ def claims(self): return self -class FpnvClient: - """The client for the Firebase Phone Number Verification service.""" +class _FpnvService: + """Service class that implements Firebase Phone Number Verification functionality.""" _project_id = None def __init__(self, app): - """Initializes the FpnvClient. - - Args: - app: A firebase_admin.App instance. - - Raises: - ValueError: If the app is invalid or lacks a project ID. - """ self._project_id = app.project_id if not self._project_id: @@ -154,10 +155,12 @@ def verify(self, token) -> Dict[str, Any]: try: self._validate_headers(jwt.get_unverified_header(token)) signing_key = self._jwks_client.get_signing_key_from_jwt(token) - claims = self._validate_payload(token, signing_key.key) + claims = self._decode_and_verify(token, signing_key.key) except (InvalidTokenError, DecodeError, PyJWKClientError) as exception: - raise ValueError( - f'Verifying FPNV token failed. Error: {exception}' + raise InvalidFpnvTokenError( + 'Verifying FPNV token failed.', + cause=exception, + http_response=getattr(exception, 'http_response', None) ) from exception return claims @@ -165,19 +168,19 @@ def verify(self, token) -> Dict[str, Any]: def _validate_headers(self, headers: Any) -> None: """Validates the headers.""" if headers.get('kid') is None: - raise ValueError("FPNV has no 'kid' claim.") + raise InvalidFpnvTokenError("FPNV has no 'kid' claim.") if headers.get('typ') != 'JWT': - raise ValueError("The provided FPNV token has an incorrect type header") + raise InvalidFpnvTokenError("The provided FPNV token has an incorrect type header") algorithm = headers.get('alg') if algorithm != _ALGORITHM_ES256: - raise ValueError( + raise InvalidFpnvTokenError( 'The provided FPNV token has an incorrect alg header. ' f'Expected {_ALGORITHM_ES256} but got {algorithm}.' ) - def _validate_payload(self, token: str, signing_key: str) -> Dict[str, Any]: + def _decode_and_verify(self, token, signing_key) -> Dict[str, Any]: """Decodes and verifies the token.""" expected_issuer = f'{_FPNV_ISSUER}{self._project_id}' try: @@ -189,25 +192,25 @@ def _validate_payload(self, token: str, signing_key: str) -> Dict[str, Any]: issuer=expected_issuer ) except InvalidSignatureError as exception: - raise ValueError( + raise InvalidFpnvTokenError( 'The provided FPNV token has an invalid signature.' ) from exception except InvalidAudienceError as exception: - raise ValueError( + raise InvalidFpnvTokenError( 'The provided FPNV token has an incorrect "aud" (audience) claim. ' f'Expected payload to include {expected_issuer}.' ) from exception except InvalidIssuerError as exception: - raise ValueError( + raise InvalidFpnvTokenError( 'The provided FPNV token has an incorrect "iss" (issuer) claim. ' f'Expected claim to include {expected_issuer}' ) from exception except ExpiredSignatureError as exception: - raise ValueError( + raise ExpiredFpnvTokenError( 'The provided FPNV token has expired.' ) from exception except InvalidTokenError as exception: - raise ValueError( + raise InvalidFpnvTokenError( f'Decoding FPNV token failed. Error: {exception}' ) from exception @@ -229,3 +232,16 @@ def check_string(cls, label: str, value: Any): """Checks if the given value is a string.""" if not isinstance(value, str) or not value: raise ValueError(f'{label} must be a non-empty string.') + +# Firebase Phone Number Verification (FPNV) Errors +class InvalidFpnvTokenError(exceptions.InvalidArgumentError): + """Raised when an FPNV token is invalid.""" + + def __init__(self, message, cause=None, http_response=None): + exceptions.InvalidArgumentError.__init__(self, message, cause, http_response) + +class ExpiredFpnvTokenError(InvalidFpnvTokenError): + """Raised when an FPNV token is expired.""" + + def __init__(self, message, cause=None, http_response=None): + InvalidFpnvTokenError.__init__(self, message, cause, http_response) diff --git a/tests/test_fpnv.py b/tests/test_fpnv.py index bd0e3e7e..0f500a04 100644 --- a/tests/test_fpnv.py +++ b/tests/test_fpnv.py @@ -47,10 +47,6 @@ } -@pytest.fixture -def client(): - app = firebase_admin.get_app() - return fpnv.client(app) class TestCommon: @@ -72,49 +68,19 @@ def test_properties(self): assert token.sub == _PHONE_NUMBER assert token.issuer == _ISSUER assert token.audience == [_ISSUER] - assert token.exp == _MOCK_PAYLOAD['exp'] - assert token.iat == _MOCK_PAYLOAD['iat'] - assert token.claims == _MOCK_PAYLOAD + expected_claims = _MOCK_PAYLOAD.copy() + expected_claims['phone_number'] = _PHONE_NUMBER + assert token.claims == expected_claims assert token['other'] == _MOCK_PAYLOAD['other'] -class TestFpnvClient(TestCommon): - - def test_client(self): - app = firebase_admin.get_app() - client = fpnv.client(app) - assert isinstance(client, fpnv.FpnvClient) - assert client._project_id == _PROJECT_ID +class TestVerifyToken(TestCommon): - def test_requires_project_id(self): - cred = testutils.MockCredential() - # Create app without project ID - app = firebase_admin.initialize_app(cred, name='no_project_id') - # Mock credential to not have project_id + def test_no_project_id(self): + app = firebase_admin.initialize_app(testutils.MockCredential(), name='no_project_id') app.credential.get_credential().project_id = None - - with pytest.raises(ValueError, match='Project ID is required'): - fpnv.client(app) - - def test_client_default_app(self): - client = fpnv.client() - assert isinstance(client, fpnv.FpnvClient) - - def test_client_explicit_app(self): - cred = testutils.MockCredential() - app = firebase_admin.initialize_app(cred, {'projectId': _PROJECT_ID}, name='custom') - client = fpnv.client(app) - assert isinstance(client, fpnv.FpnvClient) - - def test_client_illegal_app_argument_wrong_app_type(self): - cred = testutils.MockCredential() - app = "" - with pytest.raises(ValueError, match = 'Illegal app argument. Argument must be of type firebase_admin.App, but given ' - f'"{type(app)}".'): - client = fpnv.client(app) - - -class TestVerifyToken(TestCommon): + with pytest.raises(ValueError, match='Project ID is required for FPNV'): + fpnv.verify_token('token', app=app) def test_verify_token_with_real_crypto(self): """Verifies a token signed with a real ES256 key pair. @@ -164,17 +130,26 @@ def to_b64url(b_data): mock_fetch.return_value = {'keys': [jwk]} app = firebase_admin.get_app() - client = fpnv.client(app) - decoded_token = client.verify_token(token) + decoded_token = fpnv.verify_token(token, app) assert decoded_token['sub'] == _PHONE_NUMBER assert _ISSUER in decoded_token['aud'] assert decoded_token.phone_number == decoded_token['sub'] + # Test convenience dictionary lookup + assert decoded_token['phone_number'] == _PHONE_NUMBER + + def test_verify_token_module_level_delegation(self): + """Verifies module-level verify_token delegates correctly.""" + with patch('firebase_admin.fpnv._FpnvService.verify_token') as mock_verify: + mock_verify.return_value = 'mock-result' + res = fpnv.verify_token('some-token') + assert res == 'mock-result' + mock_verify.assert_called_once_with('some-token') @mock.patch('jwt.PyJWKClient') @mock.patch('jwt.decode') @mock.patch('jwt.get_unverified_header') - def test_verify_token_success(self, mock_header, mock_decode, mock_jwks_cls, client): + def test_verify_token_success(self, mock_header, mock_decode, mock_jwks_cls): token_str = 'valid.token.string' # Mock Header mock_header.return_value = {'kid': 'key1', 'typ': 'JWT', 'alg': 'ES256'} @@ -184,12 +159,13 @@ def test_verify_token_success(self, mock_header, mock_decode, mock_jwks_cls, cli mock_signing_key = mock.Mock() mock_signing_key.key = _PUBLIC_KEY mock_jwks_instance.get_signing_key_from_jwt.return_value = mock_signing_key - client._verifier._jwks_client = mock_jwks_instance + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance mock_decode.return_value = _MOCK_PAYLOAD # Execute - token = client.verify_token(token_str) + token = fpnv.verify_token(token_str) # Verify assert isinstance(token, fpnv.FpnvToken) @@ -208,35 +184,32 @@ def test_verify_token_success(self, mock_header, mock_decode, mock_jwks_cls, cli @mock.patch('jwt.get_unverified_header') def test_verify_token_no_name(self, mock_header): app = firebase_admin.get_app() - client = fpnv.client(app) mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} with pytest.raises(ValueError, match="must be a non-empty string"): - client.verify_token('') + fpnv.verify_token('', app=app) @mock.patch('jwt.get_unverified_header') def test_verify_token_no_kid(self, mock_header): app = firebase_admin.get_app() - client = fpnv.client(app) mock_header.return_value = {'typ': 'JWT', 'alg': 'ES256'} # Missing kid - with pytest.raises(ValueError, match="FPNV has no 'kid' claim."): - client.verify_token('token') + with pytest.raises(fpnv.InvalidFpnvTokenError, match="FPNV has no 'kid' claim."): + fpnv.verify_token('token', app=app) @mock.patch('jwt.get_unverified_header') - def test_verify_token_wrong_alg(self, mock_header, client): + def test_verify_token_wrong_alg(self, mock_header): mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'RS256'} # Wrong alg - with pytest.raises(ValueError, match="incorrect alg"): - client.verify_token('token') + with pytest.raises(fpnv.InvalidFpnvTokenError, match="incorrect alg"): + fpnv.verify_token('token') @mock.patch('jwt.get_unverified_header') - def test_verify_token_wrong_typ(self, mock_header, client): + def test_verify_token_wrong_typ(self, mock_header): mock_header.return_value = {'kid': 'k', 'typ': 'WRONG', 'alg': 'ES256'} # wrong typ - with pytest.raises(ValueError, match="incorrect type header"): - client.verify_token('token') + with pytest.raises(fpnv.InvalidFpnvTokenError, match="incorrect type header"): + fpnv.verify_token('token') - def test_verify_token_jwk_error(self, client): - # Access the ACTUAL client instance used by the verifier - # (Assuming internal structure: client -> _verifier -> _jwks_client) - jwks_client = client._verifier._jwks_client + def test_verify_token_jwk_error(self): + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + jwks_client = service._verifier._jwks_client # Mock the method on the existing instance with mock.patch.object(jwks_client, 'get_signing_key_from_jwt') as mock_method: @@ -246,80 +219,85 @@ def test_verify_token_jwk_error(self, client): with mock.patch('jwt.get_unverified_header') as mock_header: mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} - with pytest.raises(ValueError, match="Verifying FPNV token failed"): - client.verify_token('token') + with pytest.raises(fpnv.InvalidFpnvTokenError, match="Verifying FPNV token failed"): + fpnv.verify_token('token') @mock.patch('jwt.PyJWKClient') @mock.patch('jwt.decode') @mock.patch('jwt.get_unverified_header') - def test_verify_token_expired(self, mock_header, mock_decode, mock_jwks_cls, client): + def test_verify_token_expired(self, mock_header, mock_decode, mock_jwks_cls): mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} mock_jwks_instance = mock_jwks_cls.return_value mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY - client._verifier._jwks_client = mock_jwks_instance + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance # Simulate ExpiredSignatureError mock_decode.side_effect = jwt.ExpiredSignatureError("Expired") - with pytest.raises(ValueError, match="token has expired"): - client.verify_token('token') + with pytest.raises(fpnv.ExpiredFpnvTokenError, match="token has expired"): + fpnv.verify_token('token') @mock.patch('jwt.PyJWKClient') @mock.patch('jwt.decode') @mock.patch('jwt.get_unverified_header') - def test_verify_token_invalid_signature(self, mock_header, mock_decode, mock_jwks_cls, client): + def test_verify_token_invalid_signature(self, mock_header, mock_decode, mock_jwks_cls): mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} mock_jwks_instance = mock_jwks_cls.return_value mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY - client._verifier._jwks_client = mock_jwks_instance + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance # Simulate InvalidSignatureError mock_decode.side_effect = jwt.InvalidSignatureError("Wrong Signature") - with pytest.raises(ValueError, match="invalid signature"): - client.verify_token('token') + with pytest.raises(fpnv.InvalidFpnvTokenError, match="invalid signature"): + fpnv.verify_token('token') @mock.patch('jwt.PyJWKClient') @mock.patch('jwt.decode') @mock.patch('jwt.get_unverified_header') - def test_verify_token_invalid_audience(self, mock_header, mock_decode, mock_jwks_cls, client): + def test_verify_token_invalid_audience(self, mock_header, mock_decode, mock_jwks_cls): mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} mock_jwks_instance = mock_jwks_cls.return_value mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY - client._verifier._jwks_client = mock_jwks_instance + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance # Simulate InvalidAudienceError mock_decode.side_effect = jwt.InvalidAudienceError("Wrong Aud") - with pytest.raises(ValueError, match="incorrect \"aud\""): - client.verify_token('token') + with pytest.raises(fpnv.InvalidFpnvTokenError, match="incorrect \"aud\""): + fpnv.verify_token('token') @mock.patch('jwt.PyJWKClient') @mock.patch('jwt.decode') @mock.patch('jwt.get_unverified_header') - def test_verify_token_invalid_issuer(self, mock_header, mock_decode, mock_jwks_cls, client): + def test_verify_token_invalid_issuer(self, mock_header, mock_decode, mock_jwks_cls): mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} mock_jwks_instance = mock_jwks_cls.return_value mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY - client._verifier._jwks_client = mock_jwks_instance + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance # Simulate InvalidIssuerError mock_decode.side_effect = jwt.InvalidIssuerError("Wrong Iss") - with pytest.raises(ValueError, match="incorrect \"iss\""): - client.verify_token('token') + with pytest.raises(fpnv.InvalidFpnvTokenError, match="incorrect \"iss\""): + fpnv.verify_token('token') @mock.patch('jwt.PyJWKClient') @mock.patch('jwt.decode') @mock.patch('jwt.get_unverified_header') - def test_verify_token_invalid_token(self, mock_header, mock_decode, mock_jwks_cls, client): + def test_verify_token_invalid_token(self, mock_header, mock_decode, mock_jwks_cls): mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} mock_jwks_instance = mock_jwks_cls.return_value mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY - client._verifier._jwks_client = mock_jwks_instance + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance # Simulate InvalidTokenError mock_decode.side_effect = jwt.InvalidTokenError("Decoding FPNV token failed") - with pytest.raises(ValueError, match="incorrect \"iss\""): - client.verify_token('token') \ No newline at end of file + with pytest.raises(fpnv.InvalidFpnvTokenError, match="Decoding FPNV token failed"): + fpnv.verify_token('token') From 3f662a2cfed99fe58fc187049872e3cab8a9d868 Mon Sep 17 00:00:00 2001 From: jonathanedey Date: Mon, 23 Mar 2026 17:46:05 -0400 Subject: [PATCH 12/13] fix: Address gemini code review --- firebase_admin/fpnv.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py index 28318700..da8bd58a 100644 --- a/firebase_admin/fpnv.py +++ b/firebase_admin/fpnv.py @@ -109,12 +109,6 @@ class _FpnvService: def __init__(self, app): self._project_id = app.project_id - - if not self._project_id: - cred = app.credential.get_credential() - if hasattr(cred, 'project_id'): - self._project_id = cred.project_id - if not self._project_id: raise ValueError( 'Project ID is required for FPNV. Please ensure the app is ' @@ -171,7 +165,10 @@ def _validate_headers(self, headers: Any) -> None: raise InvalidFpnvTokenError("FPNV has no 'kid' claim.") if headers.get('typ') != 'JWT': - raise InvalidFpnvTokenError("The provided FPNV token has an incorrect type header") + raise InvalidFpnvTokenError( + 'The provided FPNV token has an incorrect type header. ' \ + f"Expected 'JWT' but got {headers.get('typ')!r}." + ) algorithm = headers.get('alg') if algorithm != _ALGORITHM_ES256: @@ -198,12 +195,12 @@ def _decode_and_verify(self, token, signing_key) -> Dict[str, Any]: except InvalidAudienceError as exception: raise InvalidFpnvTokenError( 'The provided FPNV token has an incorrect "aud" (audience) claim. ' - f'Expected payload to include {expected_issuer}.' + f'Expected {expected_issuer}.' ) from exception except InvalidIssuerError as exception: raise InvalidFpnvTokenError( 'The provided FPNV token has an incorrect "iss" (issuer) claim. ' - f'Expected claim to include {expected_issuer}' + f'Expected {expected_issuer}.' ) from exception except ExpiredSignatureError as exception: raise ExpiredFpnvTokenError( From c5e2bd5a315abff32adbb0e9d48a84ecb79d3b86 Mon Sep 17 00:00:00 2001 From: jonathanedey Date: Tue, 24 Mar 2026 10:48:20 -0400 Subject: [PATCH 13/13] fix: error doc strings --- firebase_admin/fpnv.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py index da8bd58a..282c3e3f 100644 --- a/firebase_admin/fpnv.py +++ b/firebase_admin/fpnv.py @@ -48,6 +48,7 @@ def verify_token(token: str, app: Optional[App] = None) -> FpnvToken: FpnvToken: The verified token claims. Raises: + ValueError: If the token is not a string or is empty. InvalidFpnvTokenError: If the token is invalid or malformed. ExpiredFpnvTokenError: If the token has expired. """ @@ -129,7 +130,9 @@ def verify_token(self, token) -> FpnvToken: FpnvToken: The verified token claims. Raises: - ValueError: If the token is invalid or malformed. + ValueError: If the token is not a string or is empty. + InvalidFpnvTokenError: If the token is invalid or malformed. + ExpiredFpnvTokenError: If the token has expired. """ return FpnvToken(self._verifier.verify(token)) @@ -211,9 +214,12 @@ def _decode_and_verify(self, token, signing_key) -> Dict[str, Any]: f'Decoding FPNV token failed. Error: {exception}' ) from exception - _Validators.check_string( - 'The provided FPNV token "sub" (subject) claim', - payload.get('sub')) + sub_claim = payload.get('sub') + if not isinstance(sub_claim, str) or not sub_claim: + raise InvalidFpnvTokenError( + 'The provided FPNV token has an incorrect "sub" (subject) claim. ' + 'Expected a non-empty string.' + ) return payload