Add error logging for integrations.gitea.middlewares.signature.GiteaSignatureMiddleware

This commit is contained in:
2026-05-06 01:35:59 +02:00
parent 2db98caaef
commit 2da8da9f26
@@ -14,7 +14,9 @@
import hmac import hmac
import hashlib import hashlib
from fastapi import Request, HTTPException import logging
from typing import Optional
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse from starlette.responses import JSONResponse
@@ -22,6 +24,37 @@ __all__ = [
'GiteaSignatureMiddleware' 'GiteaSignatureMiddleware'
] ]
def _get_client_host(request: Request, proxy: bool = False, proxy_host_header: Optional[str] = 'X-Forwarded-For') -> str:
'''
Returns the client host.
If ``proxy`` is ``True``, the header that is specified in
``proxy_host_header`` is used to get the client host. Otherwise, the
real client host is returned.
:param request: The request object.
:type request: Request
:param proxy: Whether a proxy is used.
:type proxy: bool
:param proxy_host_header: The header set by the proxy containing the
original client host.
:type proxy_host_header: Optional[str]
:raises ValueError: If ``proxy`` is ``True`` and
``proxy_host_header`` is ``None``.
:return: The client host.
:rtype: str
'''
if proxy:
if not proxy_host_header:
raise ValueError('Passing a proxy host header is necessary if proxy is True')
return request.headers.get(proxy_host_header)
else:
return request.client.host
class GiteaSignatureMiddleware(BaseHTTPMiddleware): class GiteaSignatureMiddleware(BaseHTTPMiddleware):
''' '''
A middleware to verify the Gitea signature. A middleware to verify the Gitea signature.
@@ -29,10 +62,13 @@ class GiteaSignatureMiddleware(BaseHTTPMiddleware):
:param app: The (FastAPI) app. :param app: The (FastAPI) app.
:param secret: The secret. :param secret: The secret.
:type secret: bytes :type secret: bytes
:param logger: The logger.
:type logger: Optional[logging.Logger]
''' '''
def __init__(self, app, secret: bytes) -> None: def __init__(self, app, secret: bytes, logger: Optional[logging.Logger] = None) -> None:
super().__init__(app) super().__init__(app)
self.secret = secret self.secret = secret
self.logger = logger
async def dispatch(self, request: Request, call_next): async def dispatch(self, request: Request, call_next):
''' '''
@@ -41,11 +77,14 @@ class GiteaSignatureMiddleware(BaseHTTPMiddleware):
:param request: The request. :param request: The request.
:type request: Request :type request: Request
''' '''
if request.url.path.startswith('/webhook/gitea'): if request.url.path.startswith('/webhook/gitea'):
signature = request.headers.get('X-Gitea-Signature') signature = request.headers.get('X-Gitea-Signature')
if not signature: if not signature:
if self.logger is not None:
self.logger.error(f'Permission denied: no signature (client: {_get_client_host(request)})')
return JSONResponse( return JSONResponse(
status_code = 401, status_code = 401,
content = {'detail': 'Invalid signature'} content = {'detail': 'Invalid signature'}
@@ -61,6 +100,8 @@ class GiteaSignatureMiddleware(BaseHTTPMiddleware):
).hexdigest()}', ).hexdigest()}',
signature signature
): ):
if self.logger is not None:
self.logger.error(f'Permission denied: invalid signature (client: {_get_client_host(request)})')
return JSONResponse( return JSONResponse(
status_code = 401, status_code = 401,
content = {'detail': 'Invalid signature'} content = {'detail': 'Invalid signature'}