generated from jCloud/repository-template
Divide API into modules
This commit is contained in:
@@ -12,144 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from fastapi import FastAPI
|
||||
import os
|
||||
import uvicorn
|
||||
import argparse
|
||||
import ipaddress
|
||||
import jcloud_config_parser
|
||||
import logging
|
||||
import pathlib
|
||||
|
||||
def existing_file(path: str) -> pathlib.Path:
|
||||
'''
|
||||
Checks whether a path is a file and returns it if it exists.
|
||||
|
||||
:param path: The path
|
||||
:type path: str
|
||||
|
||||
:raises argparse.ArgumentTypeError: If the path is a directory or does not exist.
|
||||
|
||||
:return: The path
|
||||
:rtype: pathlib.Path
|
||||
'''
|
||||
|
||||
if os.path.isdir(path):
|
||||
raise argparse.ArgumentTypeError(f'\'{path}\': Is a directory')
|
||||
if not os.path.isfile(path):
|
||||
raise argparse.ArgumentTypeError(f'\'{path}\': No such file or directory')
|
||||
return pathlib.Path(path)
|
||||
|
||||
# parse arguments
|
||||
|
||||
argument_parser = argparse.ArgumentParser(description='The jCloud deployment server.')
|
||||
argument_parser.add_argument('-c', '--config', type=existing_file, default=pathlib.Path(os.path.dirname(__file__) + '/../../config/'), help='The directory containing the configuration files.')
|
||||
argument_parser.add_argument('-H', '--host', type=ipaddress.ip_address, default=None, help='The host to listen')
|
||||
argument_parser.add_argument('-p', '--port', type=int, default=None, help='The port to listen')
|
||||
args = argument_parser.parse_args()
|
||||
|
||||
# load default configuration
|
||||
with open(os.path.dirname(__file__) + '/../../default_configuration.conf') as f:
|
||||
default_configuration = jcloud_config_parser.ini.INIConfiguration.from_string(f.read())
|
||||
f.close()
|
||||
|
||||
# load configuration
|
||||
configuration = jcloud_config_parser.ini.INIConfiguration.from_string(
|
||||
(args.config / 'server.conf').read_text(),
|
||||
default=default_configuration,
|
||||
ignore_errors=True
|
||||
)
|
||||
|
||||
logfile = configuration.logging.logfile
|
||||
if os.path.isdir(logfile):
|
||||
logfile = None
|
||||
if not logfile:
|
||||
logfile = None
|
||||
|
||||
LOGLEVEL = logging._nameToLevel.get(configuration.logging.loglevel.upper(), logging._nameToLevel.get(default_configuration.logging.loglevel.upper(), logging.INFO))
|
||||
|
||||
# set up logging
|
||||
LOGGING_FORMAT = '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
|
||||
logging.basicConfig(level=LOGLEVEL, format=LOGGING_FORMAT)
|
||||
logger = logging.getLogger('api')
|
||||
logger.propagate = False
|
||||
logger.handlers = list()
|
||||
|
||||
formatter = logging.Formatter(LOGGING_FORMAT)
|
||||
if logfile is not None:
|
||||
logger_file_handler = logging.FileHandler(logfile)
|
||||
logger_file_handler.setFormatter(formatter)
|
||||
logger.addHandler(logger_file_handler)
|
||||
|
||||
def validate_host(host: str) -> bool:
|
||||
'''
|
||||
Checks whether the address is a valid address (``ip:port``), e. g. ``0.0.0.0:3000``.
|
||||
|
||||
:param addr: The address
|
||||
:type addr: str
|
||||
|
||||
:return: ``True`` if the address is valid, otherwise ``False``.
|
||||
:rtype: bool
|
||||
'''
|
||||
|
||||
try:
|
||||
ipaddress.ip_address(host)
|
||||
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
# host
|
||||
host = args.host or configuration.server.host
|
||||
if not validate_host(host):
|
||||
logger.error(f'Error in configuration: [server]host: \'{host}\' is not a valid host, using default value.')
|
||||
host = default_configuration.server.host
|
||||
|
||||
# port
|
||||
port = args.port or configuration.server.port
|
||||
if not port.isdigit():
|
||||
logger.error(f'Error in configuration: [server]port: \'{port}\' is not an integer, using default value.')
|
||||
port = default_configuration.server.port
|
||||
else:
|
||||
if not (0 <= int(port) <= 65535):
|
||||
logger.error(f'Error in configuration: [server]port: {port} is not between 0 and 65535, using default value.')
|
||||
port = default_configuration.server.port
|
||||
port = int(port)
|
||||
|
||||
# Initialize FastAPI
|
||||
app = FastAPI(
|
||||
docs_url=configuration.docs.swagger,
|
||||
redoc_url=configuration.docs.redoc,
|
||||
openapi_url=configuration.docs.openapi
|
||||
)
|
||||
from .api import main
|
||||
|
||||
if __name__ == '__main__':
|
||||
uvicorn.run(
|
||||
app,
|
||||
host = host,
|
||||
port = port,
|
||||
server_header = False,
|
||||
log_config = {
|
||||
'version': 1,
|
||||
'disable_existing_loggers': True,
|
||||
'handlers': {
|
||||
'uvicorn_handler': {
|
||||
'class': 'logging.FileHandler',
|
||||
'filename': logfile,
|
||||
'formatter': 'uvicorn_formatter',
|
||||
},
|
||||
},
|
||||
'formatters': {
|
||||
'uvicorn_formatter': {
|
||||
'format': LOGGING_FORMAT
|
||||
},
|
||||
},
|
||||
'loggers': {
|
||||
'uvicorn': {
|
||||
'handlers': ['uvicorn_handler'],
|
||||
'level': LOGLEVEL,
|
||||
'propagate': False
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
main.main()
|
||||
@@ -0,0 +1,62 @@
|
||||
# Copyright 2026 jCloud
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import pathlib
|
||||
import ipaddress
|
||||
|
||||
__all__ = [
|
||||
'parse_args'
|
||||
]
|
||||
|
||||
def existing_file(path: str) -> pathlib.Path:
|
||||
'''
|
||||
Checks whether a path is a file and returns it if it exists.
|
||||
|
||||
:param path: The path
|
||||
:type path: str
|
||||
|
||||
:raises argparse.ArgumentTypeError: If the path is a directory or does not exist.
|
||||
|
||||
:return: The path
|
||||
:rtype: pathlib.Path
|
||||
'''
|
||||
|
||||
if os.path.isdir(path):
|
||||
raise argparse.ArgumentTypeError(f'\'{path}\': Is a directory')
|
||||
if not os.path.isfile(path):
|
||||
raise argparse.ArgumentTypeError(f'\'{path}\': No such file or directory')
|
||||
return pathlib.Path(path)
|
||||
|
||||
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||||
'''
|
||||
Parses the arguments.
|
||||
|
||||
:param argv: The arguments.
|
||||
:type argv: list[str]
|
||||
|
||||
:return: The argument namespace.
|
||||
:rtype: argparse.Namespace
|
||||
'''
|
||||
|
||||
# parse arguments
|
||||
|
||||
argument_parser = argparse.ArgumentParser(description='The jCloud deployment server.')
|
||||
|
||||
argument_parser.add_argument('-c', '--config', type=existing_file, default=pathlib.Path(os.path.dirname(__file__) + '/../../../config/'), help='The directory containing the configuration files.')
|
||||
argument_parser.add_argument('-H', '--host', type=ipaddress.ip_address, default=None, help='The host to listen')
|
||||
argument_parser.add_argument('-p', '--port', type=int, default=None, help='The port to listen')
|
||||
|
||||
return argument_parser.parse_args(argv)
|
||||
@@ -0,0 +1,112 @@
|
||||
# Copyright 2026 jCloud
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import jcloud_config_parser
|
||||
import ipaddress
|
||||
import pathlib
|
||||
import logging
|
||||
import argparse
|
||||
|
||||
__all__ = [
|
||||
'load_config',
|
||||
'process_host_and_port'
|
||||
]
|
||||
|
||||
def load_config(config_directory: pathlib.Path) -> tuple[
|
||||
jcloud_config_parser.ini.INIConfiguration,
|
||||
jcloud_config_parser.ini.INIConfiguration
|
||||
]:
|
||||
'''
|
||||
Loads the configuration and the default configuration.
|
||||
|
||||
:param config_directory: The configuration directory.
|
||||
:type config_directory: pathlib.Path
|
||||
|
||||
:return: The configuration.
|
||||
:rtype: tuple[jcloud_config_parser.ini.INIConfiguration, jcloud_config_parser.ini.INIConfiguration]
|
||||
'''
|
||||
|
||||
# load default configuration
|
||||
with open(os.path.dirname(__file__) + '/../../../default_configuration.conf') as f:
|
||||
default_configuration = jcloud_config_parser.ini.INIConfiguration.from_string(f.read())
|
||||
f.close()
|
||||
|
||||
# load configuration
|
||||
configuration = jcloud_config_parser.ini.INIConfiguration.from_string(
|
||||
(config_directory / 'server.conf').read_text(),
|
||||
default=default_configuration,
|
||||
ignore_errors=True
|
||||
)
|
||||
|
||||
return configuration, default_configuration
|
||||
|
||||
def _validate_host(host: str) -> bool:
|
||||
'''
|
||||
Checks whether the address is a valid address (``ip:port``), e. g. ``0.0.0.0:3000``.
|
||||
|
||||
:param addr: The address
|
||||
:type addr: str
|
||||
|
||||
:return: ``True`` if the address is valid, otherwise ``False``.
|
||||
:rtype: bool
|
||||
'''
|
||||
|
||||
try:
|
||||
ipaddress.ip_address(host)
|
||||
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def process_host_and_port(
|
||||
configuration: jcloud_config_parser.ini.INIConfiguration,
|
||||
default_configuration: jcloud_config_parser.ini.INIConfiguration,
|
||||
args: argparse.Namespace,
|
||||
logger: logging.Logger
|
||||
) -> tuple[str, int]:
|
||||
'''
|
||||
Processes the host and port in the configuration.
|
||||
|
||||
Uses default values and the configuration data passed via
|
||||
command-line arguments as a fallback.
|
||||
|
||||
:param configuration: The configuration.
|
||||
:type configuration: jcloud_config_parser.ini.INIConfiguration
|
||||
:param default_configuration: The default configuration.
|
||||
:type default_configuration: jcloud_config_parser.ini.INIConfiguration
|
||||
:param args: The arguments.
|
||||
:type args: argparse.Namespace
|
||||
|
||||
:return: The host and the port
|
||||
:rtype: tuple[str, int]
|
||||
'''
|
||||
|
||||
# host
|
||||
host = args.host or configuration.server.host
|
||||
if not _validate_host(host):
|
||||
logger.error(f'Error in configuration: [server]host: \'{host}\' is not a valid host, using default value.')
|
||||
host = default_configuration.server.host
|
||||
|
||||
# port
|
||||
port = args.port or configuration.server.port
|
||||
if not port.isdigit():
|
||||
logger.error(f'Error in configuration: [server]port: \'{port}\' is not an integer, using default value.')
|
||||
port = default_configuration.server.port
|
||||
else:
|
||||
if not (0 <= int(port) <= 65535):
|
||||
logger.error(f'Error in configuration: [server]port: {port} is not between 0 and 65535, using default value.')
|
||||
port = default_configuration.server.port
|
||||
|
||||
return host, int(port)
|
||||
@@ -0,0 +1,70 @@
|
||||
# Copyright 2026 jCloud
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import jcloud_config_parser
|
||||
import logging
|
||||
import pathlib
|
||||
from dataclasses import dataclass
|
||||
|
||||
__all__ = [
|
||||
'setup_logging'
|
||||
]
|
||||
|
||||
@dataclass
|
||||
class LoggingInfo:
|
||||
logger: logging.Logger
|
||||
logfile: pathlib.Path
|
||||
format: str
|
||||
level: int
|
||||
|
||||
def setup_logging(configuration: jcloud_config_parser.ini.INIConfiguration) -> LoggingInfo:
|
||||
'''
|
||||
Sets up logging.
|
||||
|
||||
:param configuration: The configuration.
|
||||
:type configuration: jcloud_config_parser.ini.INIConfiguration
|
||||
|
||||
:return: The logger info.
|
||||
:rtype: LoggingInfo
|
||||
'''
|
||||
|
||||
logfile = configuration.logging.logfile
|
||||
if os.path.isdir(logfile) or not logfile:
|
||||
logfile = None
|
||||
|
||||
loglevel = logging._nameToLevel.get(
|
||||
configuration.logging.loglevel.upper(),
|
||||
logging.INFO
|
||||
)
|
||||
|
||||
# set up logging
|
||||
logging_format = '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
|
||||
logging.basicConfig(level=loglevel, format=logging_format)
|
||||
logger = logging.getLogger('api')
|
||||
logger.propagate = False
|
||||
logger.handlers = list()
|
||||
|
||||
formatter = logging.Formatter(logging_format)
|
||||
if logfile is not None:
|
||||
logger_file_handler = logging.FileHandler(logfile)
|
||||
logger_file_handler.setFormatter(formatter)
|
||||
logger.addHandler(logger_file_handler)
|
||||
|
||||
return LoggingInfo(
|
||||
logger,
|
||||
logfile,
|
||||
logging_format,
|
||||
loglevel
|
||||
)
|
||||
@@ -0,0 +1,78 @@
|
||||
# Copyright 2026 jCloud
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from fastapi import FastAPI
|
||||
import os
|
||||
import uvicorn
|
||||
import ipaddress
|
||||
import jcloud_config_parser
|
||||
import logging
|
||||
from .arguments import parse_args
|
||||
from .config import load_config, process_host_and_port
|
||||
from .logging import setup_logging
|
||||
from ..integrations.gitea.middlewares.signature import GiteaSignatureMiddleware
|
||||
import sys
|
||||
|
||||
def main():
|
||||
args = parse_args(sys.argv[1:])
|
||||
|
||||
configuration, default_configuration = load_config(args.config)
|
||||
|
||||
logger_info = setup_logging(configuration)
|
||||
|
||||
host, port = process_host_and_port(
|
||||
configuration,
|
||||
default_configuration,
|
||||
args,
|
||||
logger_info.logger
|
||||
)
|
||||
|
||||
# Initialize FastAPI
|
||||
app = FastAPI(
|
||||
docs_url=configuration.docs.swagger,
|
||||
redoc_url=configuration.docs.redoc,
|
||||
openapi_url=configuration.docs.openapi
|
||||
)
|
||||
|
||||
app.add_middleware(GiteaSignatureMiddleware, secret = b'', logger = logger_info.logger)
|
||||
|
||||
uvicorn.run(
|
||||
app,
|
||||
host = host,
|
||||
port = port,
|
||||
server_header = False,
|
||||
log_config = {
|
||||
'version': 1,
|
||||
'disable_existing_loggers': True,
|
||||
'handlers': {
|
||||
'uvicorn_handler': {
|
||||
'class': 'logging.FileHandler',
|
||||
'filename': str(logger_info.logfile),
|
||||
'formatter': 'uvicorn_formatter',
|
||||
},
|
||||
},
|
||||
'formatters': {
|
||||
'uvicorn_formatter': {
|
||||
'format': logger_info.format
|
||||
},
|
||||
},
|
||||
'loggers': {
|
||||
'uvicorn': {
|
||||
'handlers': ['uvicorn_handler'],
|
||||
'level': logger_info.level,
|
||||
'propagate': False
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
Reference in New Issue
Block a user