# Copyright 2026 jCloud Services GbR # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import dbm import pickle import os import shutil import time import binascii import io import warnings import re from types import FunctionType, MethodType from . import utils from . import exceptions __all__ = [ 'TOPIC_PARTITION_SEPARATOR' 'COMPRESSION_TYPE_NONE', 'COMPRESSION_TYPE_GZIP', 'COMPRESSION_TYPE_SNAPPY', 'COMPRESSION_TYPE_LZ4', 'COMPRESSION_TYPE_ZSTD', 'FETCH_START_TYPE_OFFSET', 'FETCH_START_TYPE_TIMESTAMP', 'format_topic_partitions', 'get_next_lower_index_entry', 'validate_topic_partition', 'validate_topic_name', 'validate_topic_format', 'pack_record_headers', 'unpack_record_headers', 'FileCorruptWarning', 'AttributesByte' ] TOPIC_PARTITION_SEPARATOR = ':' _TOPIC_NAME_REGEX = re.compile(r"^[a-zA-Z0-9._-]+$") COMPRESSION_TYPE_NONE = 0 COMPRESSION_TYPE_GZIP = 1 COMPRESSION_TYPE_SNAPPY = 2 COMPRESSION_TYPE_LZ4 = 3 COMPRESSION_TYPE_ZSTD = 4 FETCH_START_TYPE_OFFSET = 0xc0 FETCH_START_TYPE_TIMESTAMP = 0xc1 def format_topic_partitions(topic_name: str, partitions: set) -> list: ''' Formats the topic partitions. :param topic_name: The name of the topic, e. g. ``test-topic`` :type topic_name: str :param partitions: The partitions, e. g. ``{1, 2, 4}`` :type partitions: set :return: The partition names, e. g. ``['test-topic:01', 'test-topic:02', 'test-topic:04']`` :rtype: list ''' return [f'{topic_name}{TOPIC_PARTITION_SEPARATOR}{p:02d}' for p in partitions] def get_next_lower_index_entry(offset: int): return utils.get_next_lower_integer_multiple(offset, 1024) def validate_topic_partition(partition_number: str | int) -> bool: ''' Validates a topic partition number format. :param partition_number: The partition number :type partition_number: str | int :return: ``True``, if the partition number is valid, otherwise ``False`` :rtype: bool ''' if isinstance(partition_number, str): if len(partition_number) != 2 or not partition_number.isdigit(): return False partition_number = int(partition_number) elif not isinstance(partition_number, int): return False if not (0 < partition_number < 100): return False return True def validate_topic_name(topic_name: str) -> bool: ''' Validates a topic name. :param topic_name: The topic name :type topic_name: str :return: ``True``, if the topic name is valid, otherwise ``False`` :rtype: bool ''' if not isinstance(topic_name, str): return False if not _TOPIC_NAME_REGEX.match(topic_name): return False if len(topic_name) == 0 or len(topic_name) > 255: return False if topic_name.startswith('.') or topic_name.endswith('.') or topic_name.startswith('..') or topic_name.endswith('..'): return False return True def validate_topic_format(topic: str): if topic.count(TOPIC_PARTITION_SEPARATOR) != 1: raise ValueError('invalid topic format') _, formatted_partition_number = topic.split(TOPIC_PARTITION_SEPARATOR) if len(formatted_partition_number) != 2 or not formatted_partition_number.isdigit(): raise ValueError('invalid partition number format') def pack_record_headers(headers: dict) -> bytes: ''' Packs the record header dictionary into bytes. :param headers: The header dictionary :type headers: dict :return: The bytes :rtype: bytes ''' packed = b'' headers = {(k.encode() if isinstance(k, str) else k): (v.encode() if isinstance(v, str) else v) for k, v in headers.items()} for k, v in headers.items(): packed += len(k).to_bytes(4) packed += k packed += len(v).to_bytes(4) packed += v return packed def _unpack_record_headers(buffer: io.BytesIO, number: int = 256): headers = {} for _ in range(number): key_length_bytes = buffer.read(4) if len(key_length_bytes) < 4: break key_length = int.from_bytes(key_length_bytes) key = buffer.read(key_length) if len(key) < key_length: break value_length_bytes = buffer.read(4) if len(value_length_bytes) < 4: break value_length = int.from_bytes(value_length_bytes) value = buffer.read(value_length) if len(value) < value_length: break headers[key] = value return headers def unpack_record_headers(packed: bytes) -> dict: ''' Unpacks the record headers from bytes. :param packed: The packed headers :type packed: bytes :return: The header dictionary :rtype: dict ''' buffer = io.BytesIO(packed) return _unpack_record_headers(buffer) class FileCorruptWarning(UserWarning): ... class AttributesByte: def __init__(self, compression_type: int, timestamp_type: int): self.compression_type = compression_type self.timestamp_type = timestamp_type def to_byte(self): return utils.bits_to_byte( 0, 0, 0, 0, # Unused bits self.timestamp_type, (self.compression_type >> 2) & 1, (self.compression_type >> 1) & 1, self.compression_type & 1, ) @classmethod def from_byte(cls, b: bytes | int): if isinstance(b, int): b = bytes([b]) bits = utils.byte_to_bits(b) compression_type = utils.bits_to_byte(*bits[-3:])[0] if compression_type > 4: raise ValueError('invalid compression type in attributes byte') timestamp_type = bits[-4] if timestamp_type > 1: raise ValueError('invalid timestamp type in attributes byte') return cls(compression_type, timestamp_type)