diff --git a/firebase_admin/fpnv.py b/firebase_admin/fpnv.py new file mode 100644 index 00000000..282c3e3f --- /dev/null +++ b/firebase_admin/fpnv.py @@ -0,0 +1,250 @@ +# Copyright 2026 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Firebase Phone Number Verification (FPNV) module. + +This module contains functions for verifying JWTs related to the Firebase +Phone Number Verification (FPNV) service. +""" +from __future__ import annotations +from typing import Any, Dict, Optional + +import jwt +from jwt import ( + PyJWKClient, InvalidTokenError, DecodeError, InvalidSignatureError, + PyJWKClientError, InvalidAudienceError, InvalidIssuerError, ExpiredSignatureError +) + +from firebase_admin import App, _utils, exceptions + +_FPNV_ATTRIBUTE = '_fpnv' +_FPNV_JWKS_URL = 'https://fpnv.googleapis.com/v1beta/jwks' +_FPNV_ISSUER = 'https://fpnv.googleapis.com/projects/' +_ALGORITHM_ES256 = 'ES256' + + +def _get_fpnv_service(app): + return _utils.get_app_service(app, _FPNV_ATTRIBUTE, _FpnvService) + +def verify_token(token: str, app: Optional[App] = None) -> FpnvToken: + """Verifies a Firebase Phone Number Verification (FPNV) token. + + Args: + token: A string containing the FPNV JWT. + app: An App instance (optional). + + Returns: + FpnvToken: The verified token claims. + + Raises: + ValueError: If the token is not a string or is empty. + InvalidFpnvTokenError: If the token is invalid or malformed. + ExpiredFpnvTokenError: If the token has expired. + """ + return _get_fpnv_service(app).verify_token(token) + + +class FpnvToken(dict): + """Represents a verified FPNV token. + + This class behaves like a dictionary, allowing access to the decoded claims. + It also provides convenience properties for common claims. + """ + + def __init__(self, claims): + super().__init__(claims) + self['phone_number'] = claims.get('sub') + + @property + def phone_number(self) -> str: + """Returns the phone number of the user. + This corresponds to the 'sub' claim in the JWT. + """ + return self.get('sub') + + @property + def issuer(self) -> str: + """Returns the issuer identifier for the issuer of the response.""" + return self.get('iss') + + @property + def audience(self) -> str: + """Returns the audience for which this token is intended.""" + return self.get('aud') + + @property + def exp(self) -> int: + """Returns the expiration time since the Unix epoch.""" + return self.get('exp') + + @property + def iat(self) -> int: + """Returns the issued-at time since the Unix epoch.""" + return self.get('iat') + + @property + def sub(self) -> str: + """Returns the sub (subject) of the token, which is the phone number.""" + return self.get('sub') + + @property + def claims(self): + """Returns the entire map of claims.""" + return self + + +class _FpnvService: + """Service class that implements Firebase Phone Number Verification functionality.""" + _project_id = None + + def __init__(self, app): + self._project_id = app.project_id + if not self._project_id: + raise ValueError( + 'Project ID is required for FPNV. Please ensure the app is ' + 'initialized with a credential that contains a project ID.' + ) + + self._verifier = _FpnvTokenVerifier(self._project_id) + + def verify_token(self, token) -> FpnvToken: + """Verifies the given FPNV token. + + Verifies the signature, expiration, and claims of the token. + + Args: + token: A string containing the FPNV JWT. + + Returns: + FpnvToken: The verified token claims. + + Raises: + ValueError: If the token is not a string or is empty. + InvalidFpnvTokenError: If the token is invalid or malformed. + ExpiredFpnvTokenError: If the token has expired. + """ + return FpnvToken(self._verifier.verify(token)) + + +class _FpnvTokenVerifier: + """Internal class for verifying FPNV JWTs signed with ES256.""" + _jwks_client = None + _project_id = None + + def __init__(self, project_id): + self._project_id = project_id + self._jwks_client = PyJWKClient(_FPNV_JWKS_URL, lifespan=21600) + + def verify(self, token) -> Dict[str, Any]: + """Verifies the given FPNV token.""" + _Validators.check_string("FPNV check token", token) + try: + self._validate_headers(jwt.get_unverified_header(token)) + signing_key = self._jwks_client.get_signing_key_from_jwt(token) + claims = self._decode_and_verify(token, signing_key.key) + except (InvalidTokenError, DecodeError, PyJWKClientError) as exception: + raise InvalidFpnvTokenError( + 'Verifying FPNV token failed.', + cause=exception, + http_response=getattr(exception, 'http_response', None) + ) from exception + + return claims + + def _validate_headers(self, headers: Any) -> None: + """Validates the headers.""" + if headers.get('kid') is None: + raise InvalidFpnvTokenError("FPNV has no 'kid' claim.") + + if headers.get('typ') != 'JWT': + raise InvalidFpnvTokenError( + 'The provided FPNV token has an incorrect type header. ' \ + f"Expected 'JWT' but got {headers.get('typ')!r}." + ) + + algorithm = headers.get('alg') + if algorithm != _ALGORITHM_ES256: + raise InvalidFpnvTokenError( + 'The provided FPNV token has an incorrect alg header. ' + f'Expected {_ALGORITHM_ES256} but got {algorithm}.' + ) + + def _decode_and_verify(self, token, signing_key) -> Dict[str, Any]: + """Decodes and verifies the token.""" + expected_issuer = f'{_FPNV_ISSUER}{self._project_id}' + try: + payload = jwt.decode( + token, + signing_key, + algorithms=[_ALGORITHM_ES256], + audience=expected_issuer, + issuer=expected_issuer + ) + except InvalidSignatureError as exception: + raise InvalidFpnvTokenError( + 'The provided FPNV token has an invalid signature.' + ) from exception + except InvalidAudienceError as exception: + raise InvalidFpnvTokenError( + 'The provided FPNV token has an incorrect "aud" (audience) claim. ' + f'Expected {expected_issuer}.' + ) from exception + except InvalidIssuerError as exception: + raise InvalidFpnvTokenError( + 'The provided FPNV token has an incorrect "iss" (issuer) claim. ' + f'Expected {expected_issuer}.' + ) from exception + except ExpiredSignatureError as exception: + raise ExpiredFpnvTokenError( + 'The provided FPNV token has expired.' + ) from exception + except InvalidTokenError as exception: + raise InvalidFpnvTokenError( + f'Decoding FPNV token failed. Error: {exception}' + ) from exception + + sub_claim = payload.get('sub') + if not isinstance(sub_claim, str) or not sub_claim: + raise InvalidFpnvTokenError( + 'The provided FPNV token has an incorrect "sub" (subject) claim. ' + 'Expected a non-empty string.' + ) + + return payload + + +class _Validators: + """A collection of data validation utilities. + + Methods provided in this class raise ``ValueErrors`` if any validations fail. + """ + + @classmethod + def check_string(cls, label: str, value: Any): + """Checks if the given value is a string.""" + if not isinstance(value, str) or not value: + raise ValueError(f'{label} must be a non-empty string.') + +# Firebase Phone Number Verification (FPNV) Errors +class InvalidFpnvTokenError(exceptions.InvalidArgumentError): + """Raised when an FPNV token is invalid.""" + + def __init__(self, message, cause=None, http_response=None): + exceptions.InvalidArgumentError.__init__(self, message, cause, http_response) + +class ExpiredFpnvTokenError(InvalidFpnvTokenError): + """Raised when an FPNV token is expired.""" + + def __init__(self, message, cause=None, http_response=None): + InvalidFpnvTokenError.__init__(self, message, cause, http_response) diff --git a/tests/test_fpnv.py b/tests/test_fpnv.py new file mode 100644 index 00000000..0f500a04 --- /dev/null +++ b/tests/test_fpnv.py @@ -0,0 +1,303 @@ +# Copyright 2026 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test cases for the firebase_admin.fpnv module.""" + +import base64 +import time +from unittest import mock +from unittest.mock import patch + +import jwt +import pytest +from cryptography.hazmat.primitives.asymmetric import ec + +import firebase_admin +from firebase_admin import fpnv +from tests import testutils + +# Mock Data +_PROJECT_ID = 'mock-project-id' +_EXP_TIMESTAMP = 2000000000 +_ISSUER = f'https://fpnv.googleapis.com/projects/{_PROJECT_ID}' +_PHONE_NUMBER = '+1234567890' +_PUBLIC_KEY = 'test-public-key' # In real tests, use the corresponding public key +_ALGORITHM = 'ES256' +_KEY_ID = 'test-key-id' +_TYPE = 'JWT' + +_MOCK_PAYLOAD = { + 'iss': _ISSUER, + 'sub': _PHONE_NUMBER, + 'aud': [_ISSUER], + 'exp': _EXP_TIMESTAMP, + 'iat': _EXP_TIMESTAMP - 3600, + "other": 'other' +} + + + + +class TestCommon: + @classmethod + def setup_class(cls): + cred = testutils.MockCredential() + firebase_admin.initialize_app(cred, {'projectId': _PROJECT_ID}) + + @classmethod + def teardown_class(cls): + testutils.cleanup_apps() + + +class TestFpnvToken: + def test_properties(self): + token = fpnv.FpnvToken(_MOCK_PAYLOAD) + + assert token.phone_number == _PHONE_NUMBER + assert token.sub == _PHONE_NUMBER + assert token.issuer == _ISSUER + assert token.audience == [_ISSUER] + expected_claims = _MOCK_PAYLOAD.copy() + expected_claims['phone_number'] = _PHONE_NUMBER + assert token.claims == expected_claims + assert token['other'] == _MOCK_PAYLOAD['other'] + + +class TestVerifyToken(TestCommon): + + def test_no_project_id(self): + app = firebase_admin.initialize_app(testutils.MockCredential(), name='no_project_id') + app.credential.get_credential().project_id = None + with pytest.raises(ValueError, match='Project ID is required for FPNV'): + fpnv.verify_token('token', app=app) + + def test_verify_token_with_real_crypto(self): + """Verifies a token signed with a real ES256 key pair. + + Mocking only the JWKS endpoint. + This ensures the cryptographic verification logic is functioning correctly. + """ + # Generate a real ES256 key pair + private_key = ec.generate_private_key(ec.SECP256R1()) + public_key = private_key.public_key() + + # Create the JWK representation of the public key (for the mock endpoint) + # Note: Retrieving numbers from the key involves cryptography primitives + public_numbers = public_key.public_numbers() + + def to_b64url(b_data): + return base64.urlsafe_b64encode(b_data).rstrip(b'=').decode('utf-8') + + jwk = { + "kty": "EC", + "use": "sig", + "alg": _ALGORITHM, + "kid": _KEY_ID, + "crv": "P-256", + "x": to_b64url(public_numbers.x.to_bytes(32, 'big')), + "y": to_b64url(public_numbers.y.to_bytes(32, 'big')), + } + now = int(time.time()) + payload = { + 'iss': _ISSUER, + 'aud': [_ISSUER], + 'iat': now, + 'exp': now + 3600, + 'sub': _PHONE_NUMBER + } + + # Sign using the private key object directly (PyJWT supports this) + token = jwt.encode( + payload, + private_key, + algorithm=_ALGORITHM, + headers={'alg': _ALGORITHM, 'typ': _TYPE, 'kid': _KEY_ID}, + ) + + # Mock PyJWKClient fetch_data + with patch('jwt.PyJWKClient.fetch_data') as mock_fetch: + mock_fetch.return_value = {'keys': [jwk]} + + app = firebase_admin.get_app() + decoded_token = fpnv.verify_token(token, app) + + assert decoded_token['sub'] == _PHONE_NUMBER + assert _ISSUER in decoded_token['aud'] + assert decoded_token.phone_number == decoded_token['sub'] + # Test convenience dictionary lookup + assert decoded_token['phone_number'] == _PHONE_NUMBER + + def test_verify_token_module_level_delegation(self): + """Verifies module-level verify_token delegates correctly.""" + with patch('firebase_admin.fpnv._FpnvService.verify_token') as mock_verify: + mock_verify.return_value = 'mock-result' + res = fpnv.verify_token('some-token') + assert res == 'mock-result' + mock_verify.assert_called_once_with('some-token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_success(self, mock_header, mock_decode, mock_jwks_cls): + token_str = 'valid.token.string' + # Mock Header + mock_header.return_value = {'kid': 'key1', 'typ': 'JWT', 'alg': 'ES256'} + + # Mock Signing Key + mock_jwks_instance = mock_jwks_cls.return_value + mock_signing_key = mock.Mock() + mock_signing_key.key = _PUBLIC_KEY + mock_jwks_instance.get_signing_key_from_jwt.return_value = mock_signing_key + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance + + mock_decode.return_value = _MOCK_PAYLOAD + + # Execute + token = fpnv.verify_token(token_str) + + # Verify + assert isinstance(token, fpnv.FpnvToken) + assert token.phone_number == _PHONE_NUMBER + + mock_header.assert_called_with(token_str) + mock_jwks_instance.get_signing_key_from_jwt.assert_called_with(token_str) + mock_decode.assert_called_with( + token_str, + _PUBLIC_KEY, + algorithms=['ES256'], + audience=_ISSUER, + issuer=_ISSUER + ) + + @mock.patch('jwt.get_unverified_header') + def test_verify_token_no_name(self, mock_header): + app = firebase_admin.get_app() + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + with pytest.raises(ValueError, match="must be a non-empty string"): + fpnv.verify_token('', app=app) + + @mock.patch('jwt.get_unverified_header') + def test_verify_token_no_kid(self, mock_header): + app = firebase_admin.get_app() + mock_header.return_value = {'typ': 'JWT', 'alg': 'ES256'} # Missing kid + with pytest.raises(fpnv.InvalidFpnvTokenError, match="FPNV has no 'kid' claim."): + fpnv.verify_token('token', app=app) + + @mock.patch('jwt.get_unverified_header') + def test_verify_token_wrong_alg(self, mock_header): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'RS256'} # Wrong alg + with pytest.raises(fpnv.InvalidFpnvTokenError, match="incorrect alg"): + fpnv.verify_token('token') + + @mock.patch('jwt.get_unverified_header') + def test_verify_token_wrong_typ(self, mock_header): + mock_header.return_value = {'kid': 'k', 'typ': 'WRONG', 'alg': 'ES256'} # wrong typ + with pytest.raises(fpnv.InvalidFpnvTokenError, match="incorrect type header"): + fpnv.verify_token('token') + + def test_verify_token_jwk_error(self): + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + jwks_client = service._verifier._jwks_client + + # Mock the method on the existing instance + with mock.patch.object(jwks_client, 'get_signing_key_from_jwt') as mock_method: + mock_method.side_effect = jwt.PyJWKClientError("Key not found") + + # Mock header is still needed if _get_signing_key calls it before the client + with mock.patch('jwt.get_unverified_header') as mock_header: + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + + with pytest.raises(fpnv.InvalidFpnvTokenError, match="Verifying FPNV token failed"): + fpnv.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_expired(self, mock_header, mock_decode, mock_jwks_cls): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance + + # Simulate ExpiredSignatureError + mock_decode.side_effect = jwt.ExpiredSignatureError("Expired") + + with pytest.raises(fpnv.ExpiredFpnvTokenError, match="token has expired"): + fpnv.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_invalid_signature(self, mock_header, mock_decode, mock_jwks_cls): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance + + # Simulate InvalidSignatureError + mock_decode.side_effect = jwt.InvalidSignatureError("Wrong Signature") + + with pytest.raises(fpnv.InvalidFpnvTokenError, match="invalid signature"): + fpnv.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_invalid_audience(self, mock_header, mock_decode, mock_jwks_cls): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance + + # Simulate InvalidAudienceError + mock_decode.side_effect = jwt.InvalidAudienceError("Wrong Aud") + + with pytest.raises(fpnv.InvalidFpnvTokenError, match="incorrect \"aud\""): + fpnv.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_invalid_issuer(self, mock_header, mock_decode, mock_jwks_cls): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance + + # Simulate InvalidIssuerError + mock_decode.side_effect = jwt.InvalidIssuerError("Wrong Iss") + + with pytest.raises(fpnv.InvalidFpnvTokenError, match="incorrect \"iss\""): + fpnv.verify_token('token') + + @mock.patch('jwt.PyJWKClient') + @mock.patch('jwt.decode') + @mock.patch('jwt.get_unverified_header') + def test_verify_token_invalid_token(self, mock_header, mock_decode, mock_jwks_cls): + mock_header.return_value = {'kid': 'k', 'typ': 'JWT', 'alg': 'ES256'} + mock_jwks_instance = mock_jwks_cls.return_value + mock_jwks_instance.get_signing_key_from_jwt.return_value.key = _PUBLIC_KEY + service = fpnv._get_fpnv_service(firebase_admin.get_app()) + service._verifier._jwks_client = mock_jwks_instance + + # Simulate InvalidTokenError + mock_decode.side_effect = jwt.InvalidTokenError("Decoding FPNV token failed") + + with pytest.raises(fpnv.InvalidFpnvTokenError, match="Decoding FPNV token failed"): + fpnv.verify_token('token')