# Copyright 2026 jCloud # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import hmac import hashlib import logging from typing import Optional from fastapi import Request from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import JSONResponse __all__ = [ 'GiteaSignatureMiddleware' ] def _get_client_host(request: Request, proxy: bool = False, proxy_host_header: Optional[str] = 'X-Forwarded-For') -> str: ''' Returns the client host. If ``proxy`` is ``True``, the header that is specified in ``proxy_host_header`` is used to get the client host. Otherwise, the real client host is returned. :param request: The request object. :type request: Request :param proxy: Whether a proxy is used. :type proxy: bool :param proxy_host_header: The header set by the proxy containing the original client host. :type proxy_host_header: Optional[str] :raises ValueError: If ``proxy`` is ``True`` and ``proxy_host_header`` is ``None``. :return: The client host. :rtype: str ''' if proxy: if not proxy_host_header: raise ValueError('Passing a proxy host header is necessary if proxy is True') return request.headers.get(proxy_host_header) else: return request.client.host class GiteaSignatureMiddleware(BaseHTTPMiddleware): ''' A middleware to verify the Gitea signature. :param app: The (FastAPI) app. :param secret: The secret. :type secret: bytes :param logger: The logger. :type logger: Optional[logging.Logger] ''' def __init__(self, app, secret: bytes, logger: Optional[logging.Logger] = None) -> None: super().__init__(app) self.secret = secret self.logger = logger async def dispatch(self, request: Request, call_next): ''' Dispatch. :param request: The request. :type request: Request ''' if request.url.path.startswith('/gitea/webhook'): signature = request.headers.get('X-Gitea-Signature') if not signature: if self.logger is not None: self.logger.error(f'Permission denied: no signature (client: {_get_client_host(request)})') return JSONResponse( status_code = 401, content = {'detail': 'Invalid signature'} ) body = await request.body() if not hmac.compare_digest( f'sha256={hmac.new( self.secret, body, hashlib.sha256 ).hexdigest()}', signature ): if self.logger is not None: self.logger.error(f'Permission denied: invalid signature (client: {_get_client_host(request)})') return JSONResponse( status_code = 401, content = {'detail': 'Invalid signature'} ) return await call_next(request)