commit a0036b6767141fe1f2280e7c08d3b537185097be Author: Jakob Scheid Date: Thu Feb 19 17:03:49 2026 +0100 neue Datei: .gitignore neue Datei: README.md neue Datei: pyproject.toml neue Datei: src/jeb_utils/__init__.py neue Datei: src/jeb_utils/auth_utils.py neue Datei: src/jeb_utils/crypto_utils.py neue Datei: src/jeb_utils/exceptions.py neue Datei: src/jeb_utils/jeb_utils.py neue Datei: src/jeb_utils/jebp_utils.py neue Datei: src/jeb_utils/utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e178647 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +dist/ +src/*.egg-info/ +.vscode/ +.pytest_cache/ +__pycache__/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..4fcde34 --- /dev/null +++ b/README.md @@ -0,0 +1,87 @@ +# jeb Utils + +Common utils for JEB client and server. + +## Installation +You can install the library using `pip`: + +```bash +pip install jeb-utils --index-url https://jcloud-services.ddns.net/simple/ --extra-index-url https://pypi.org/simple/ +``` + +## Functions and classes +### `auth_utils`: Utilities for the jeb authentication +- `load_cert_file`: Loads a certificate from a file +- `Identity`: An identity, based on a certificate +- `Verifier`: A verifier for the CA of an identity. + +### `crypto_utils`: Utilities for cryptography +- `generate_keypair`: Generates a private and a public key +- `serialize_public_key`: Serializes a public key to bytes. +- `deserialize_public_key`: Deserializes a public key from bytes. +- `derive_aes_key`: Derives an AES key from a private key and the peer public key. + +### `exceptions`: Common exceptions of the jeb library and server +- `UnknownProtocolError`: The protocol is unknown. +- `ProtocolSyntaxError`: The syntax is not valid for the protocol. +- `CRCInvalidError`: The CRC is invalid. +- `UnknownCreationError`: Unknown error creating an object. +- `UnknownSubscribingError`: Unknown error subscribing topics. +- `UnknownUnsubscribingError`: Unknown error unsubscribing topics. +- `TopicNotFoundError`: The topic was not found. +- `TopicExistsError`: The topic already exists. +- `TopicPartitionExistsError`: The partition of the topic already exists. +- `TopicPartitionNotFoundError`: The partition of the topic was not found. +- `TopicNotSubscribedError`: The topic is not subscribed. +- `InvalidTopicNameError`: The topic name is invalid. +- `FetchStartOutOfRangeError`: The fetch start is out of range. +- `FormatError`: The format is invalid. +- `CertificateNotTrustedError`: The certificate is not trusted. + +### `jeb_utils`: Functions for jeb actions +- `format_topic_partitions`: Formats the topic partitions. +- `mktopic`: Creates a topic. +- `rmtopic`: Removes a topic. +- `mktopicpart`: Creates a partition of a topic. +- `rmtopicpart`: Removes a partition of a topic. +- `get_topics`: Returns all topics. +- `get_segment_base_timestamp`: Returns the base timestamp of a segment. +- `get_log_end_offsets`: Gets the end offsets. +- `init`: Initializes the module. +- `validate_topic_partition`: Validates a topic partition number format. +- `validate_topic_name`: Validates a topic name. +- `check_topic_exists`: Checks whether a topic exists. +- `pack_record_headers`: Packs the record header dictionary into bytes. +- `unpack_record_headers`: Unpacks the record headers from bytes. +- `create_record`: Creates a record. +- `fetch_records`: Fetches records from the topic. +- `Topic`: A topic. +- `Segment`: A segment of a topic. +- `FileCorruptWarning`: A warning for corrupt log files. +- `AttributesByte`: An attributes byte of a record. + +### `jebp_utils`: Utilities for the jeb protocol +- `MessageFormatError`: If the message format is invalid. +- `sendmsg`: Sends a message. +- `readmsg`: Receives a message. +- `pack_fields`: Packs the fields into a compact bytestring. +- `unpack_fields`: Unpacks the field from a bytestring. + +### `utils`: General utilities +- `is_number`: Checks whether the string is a number. +- `int_to_bytes`: Converts the integer into bytes (dynamic length). +- `bits_to_byte`: Converts the bits into a single byte. +- `byte_to_bits`: Converts the byte to a tuple of bits. +- `get_next_lower_integer_multiple`: Returns the next lower integer multiple. +- `find_nearest_lower_number`: Finds the nearest lower number to the target from the list. +- `create_file_if_not_exists`: Creates a file if it does not exist. + +## Changelog +### Version 0.1.0 +- First release +- `auth_utils` +- `crypto_utils` +- `exceptions` +- `jeb_utils` +- `jebp_utils` +- `utils` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4bf604c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,9 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "jeb-utils" +version = "0.1.0" +description = "Common utils for JEB client and server." +dependencies = ["cryptography"] \ No newline at end of file diff --git a/src/jeb_utils/__init__.py b/src/jeb_utils/__init__.py new file mode 100644 index 0000000..fdc313a --- /dev/null +++ b/src/jeb_utils/__init__.py @@ -0,0 +1,19 @@ +''' +The utils for jeb. +''' + +from . import auth_utils +from . import crypto_utils +from . import exceptions +from . import jeb_utils +from . import jebp_utils +from . import utils + +__all__ = [ + 'auth_utils', + 'crypto_utils', + 'exceptions', + 'jeb_utils', + 'jebp_utils', + 'utils', +] \ No newline at end of file diff --git a/src/jeb_utils/auth_utils.py b/src/jeb_utils/auth_utils.py new file mode 100644 index 0000000..cfe8181 --- /dev/null +++ b/src/jeb_utils/auth_utils.py @@ -0,0 +1,74 @@ +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography import x509 +import datetime + +__all__ = [ + 'load_cert_file', + 'Identity', + 'Verifier' +] + +def load_cert_file(path: str) -> x509.Certificate: + ''' + Loads a certificate from a file + + :param path: File path + :type path: str + :return: The certificate + :rtype: cryptography.x509.Certificate + ''' + with open(path, 'rb') as f: + cert_data = f.read() + return x509.load_pem_x509_certificate(cert_data) + +class Identity: + def __init__(self, cert: x509.Certificate = None): + self.cert = cert + + @property + def public_key(self): + return self.cert.public_key() + + @property + def issuer(self): + return self.cert.issuer + + @property + def subject(self): + return self.cert.subject + + @property + def not_valid_before(self): + return self.cert.not_valid_before_utc + + @property + def not_valid_after(self): + return self.cert.not_valid_after_utc + + @property + def signature_algorithm(self): + return self.cert.signature_hash_algorithm + +class Verifier: + def __init__(self, trusted_ca: Identity): + self.trusted_ca = trusted_ca + + def verify(self, identity: Identity, now = datetime.datetime.now(datetime.timezone.utc), verify_issuer = True): + if not (identity.not_valid_before <= now <= identity.not_valid_after): + return False + + + try: + self.trusted_ca.public_key.verify( + identity.cert.signature, + identity.cert.tbs_certificate_bytes, + padding.PKCS1v15(), + identity.cert.signature_hash_algorithm + ) + except: + return False + + if (identity.issuer != self.trusted_ca.subject) and verify_issuer: + return False + + return True \ No newline at end of file diff --git a/src/jeb_utils/crypto_utils.py b/src/jeb_utils/crypto_utils.py new file mode 100644 index 0000000..d2a5a59 --- /dev/null +++ b/src/jeb_utils/crypto_utils.py @@ -0,0 +1,35 @@ +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.kdf.hkdf import HKDF + +__all__ = [ + 'generate_keypair', + 'serialize_public_key', + 'deserialize_public_key', + 'derive_aes_key' +] + +def generate_keypair(): + private_key = ec.generate_private_key(ec.SECP256R1()) + return private_key, private_key.public_key() + +def serialize_public_key(public_key: ec.EllipticCurvePublicKey) -> bytes: + return public_key.public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.UncompressedPoint, + ) + +def deserialize_public_key(data: bytes) -> ec.EllipticCurvePublicKey: + return ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256R1(), data + ) + +def derive_aes_key(private_key, peer_public_key) -> bytes: + shared_secret = private_key.exchange(ec.ECDH(), peer_public_key) + + return HKDF( + algorithm=hashes.SHA256(), + length=32, + salt=None, + info=b'handshake', + ).derive(shared_secret) \ No newline at end of file diff --git a/src/jeb_utils/exceptions.py b/src/jeb_utils/exceptions.py new file mode 100644 index 0000000..8cb8946 --- /dev/null +++ b/src/jeb_utils/exceptions.py @@ -0,0 +1,38 @@ +__all__ = [ + 'UnknownProtocolError', + 'ProtocolSyntaxError', + 'CRCInvalidError', + 'UnknownCreationError', + 'UnknownSubscribingError', + 'UnknownUnsubscribingError', + 'TopicNotFoundError', + 'TopicExistsError', + 'TopicPartitionExistsError', + 'TopicPartitionNotFoundError', + 'TopicNotSubscribedError', + 'InvalidTopicNameError', + 'FetchStartOutOfRangeError', + 'FormatError', + 'CertificateNotTrustedError', +] + +class UnknownProtocolError(Exception): ... +class ProtocolSyntaxError(Exception): ... +class CRCInvalidError(Exception): ... + +class UnknownCreationError(Exception): ... + +class UnknownSubscribingError(Exception): ... +class UnknownUnsubscribingError(Exception): ... + +class TopicNotFoundError(Exception): ... +class TopicExistsError(Exception): ... +class TopicPartitionExistsError(Exception): ... +class TopicPartitionNotFoundError(Exception): ... +class TopicNotSubscribedError(Exception): ... +class InvalidTopicNameError(Exception): ... + +class FetchStartOutOfRangeError(Exception): ... +class FormatError(Exception): ... + +class CertificateNotTrustedError(Exception): ... \ No newline at end of file diff --git a/src/jeb_utils/jeb_utils.py b/src/jeb_utils/jeb_utils.py new file mode 100644 index 0000000..756a931 --- /dev/null +++ b/src/jeb_utils/jeb_utils.py @@ -0,0 +1,649 @@ +import dbm +import pickle +import os +import shutil +import time +import binascii +import io +import warnings +import re +from types import FunctionType, MethodType +from . import utils +from . import exceptions + +__all__ = [ + 'TOPIC_PARTITION_SEPARATOR' + 'COMPRESSION_TYPE_NONE', + 'COMPRESSION_TYPE_GZIP', + 'COMPRESSION_TYPE_SNAPPY', + 'COMPRESSION_TYPE_LZ4', + 'COMPRESSION_TYPE_ZSTD', + 'FETCH_START_TYPE_OFFSET', + 'FETCH_START_TYPE_TIMESTAMP', + 'FETCH_RECORDS_CHUNK_SIZE', + 'SEGMENT_MAX_SIZE', + 'format_topic_partitions', + 'mktopic', + 'rmtopic', + 'mktopicpart', + 'rmtopicpart', + 'get_topics', + 'get_segment_base_timestamp', + 'get_log_end_offsets', + 'init', + 'get_next_lower_index_entry', + 'validate_topic_partition', + 'validate_topic_name', + 'validate_topic_format', + 'check_topic_exists', + 'pack_record_headers', + 'unpack_record_headers', + 'create_record', + 'fetch_records', + 'Topic', + 'Segment', + 'FileCorruptWarning', + 'AttributesByte' +] + +TOPIC_PARTITION_SEPARATOR = ':' +_TOPIC_NAME_REGEX = re.compile(r"^[a-zA-Z0-9._-]+$") +COMPRESSION_TYPE_NONE = 0 +COMPRESSION_TYPE_GZIP = 1 +COMPRESSION_TYPE_SNAPPY = 2 +COMPRESSION_TYPE_LZ4 = 3 +COMPRESSION_TYPE_ZSTD = 4 +FETCH_START_TYPE_OFFSET = 0xc0 +FETCH_START_TYPE_TIMESTAMP = 0xc1 +FETCH_RECORDS_CHUNK_SIZE = 1024 ** 2 # 1 MiB +SEGMENT_MAX_SIZE = None + +log_end_offsets = {} + +def format_topic_partitions(topic_name: str, partitions: set) -> list: + ''' + Formats the topic partitions. + + :param topic_name: The name of the topic, e. g. ``test-topic`` + :type topic_name: str + :param partitions: The partitions, e. g. ``{1, 2, 4}`` + :type partitions: set + + :return: The partition names, e. g. ``['test-topic:01', 'test-topic:02', 'test-topic:04']`` + :rtype: list + ''' + return [f'{topic_name}{TOPIC_PARTITION_SEPARATOR}{p:02d}' for p in partitions] + +def mktopic(topic_name: str): + ''' + Creates a topic. + + :param topic_name: The name of the topic + :type topic_name: str + ''' + with dbm.open('data/conf/topics', 'c') as db: + if topic_name.encode() in db.keys(): + raise exceptions.TopicExistsError(f'topic \'{topic_name}\' already exists') + db[topic_name] = pickle.dumps({'partitions': {1}}) + db.close() + + os.mkdir(f'data/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}01/') + +def rmtopic(topic_name: str): + ''' + Removes a topic. + + :param topic_name: The topic name + :type topic_name: str + + :raises TopicNotFoundError: If the topic does not exist. + ''' + with dbm.open('data/conf/topics', 'c') as db: + if topic_name.encode() not in db.keys(): + raise exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist') + partitions = pickle.loads(db[topic_name.encode()])['partitions'] + del db[topic_name] + db.close() + + partition_dirs = format_topic_partitions(topic_name, partitions) + for pd in partition_dirs: + try: + shutil.rmtree(f'data/topics/{pd}/') + except: + pass + +def mktopicpart(topic_name: str, partition_number: int): + ''' + Creates a partition of a topic. + + :param topic_name: The topic name + :type topic_name: str + :param partition_number: The partition number + :type partition_number: int + ''' + if partition_number < 1 or partition_number > 99: + raise ValueError('partition number must be between 1 and 99') + formatted_partition_number = f'{(int(partition_number)):02d}' + with dbm.open('data/conf/topics', 'c') as db: + if topic_name.encode() not in db.keys(): + raise exceptions.exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist') + partitions = pickle.loads(db[topic_name.encode()])['partitions'] + if partition_number in partitions: + raise exceptions.TopicPartitionExistsError(f'partition \'{formatted_partition_number}\' already exists for topic \'{topic_name}\'') + db[topic_name] = pickle.dumps({'partitions': partitions | {int(formatted_partition_number)}}) + db.close() + + os.mkdir(f'data/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/') + +def rmtopicpart(topic_name: str, partition_number: int): + ''' + Removes a partition of a topic. + + :param topic_name: The topic name + :type topic_name: str + :param partition_number: The partition number + :type partition_number: int + ''' + formatted_partition_number = f'{(int(partition_number)):02d}' + with dbm.open('data/conf/topics', 'c') as db: + if topic_name.encode() not in db.keys(): + raise exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist') + partitions = pickle.loads(db[topic_name.encode()])['partitions'] + if partition_number not in partitions: + raise exceptions.TopicPartitionNotFoundError(f'partition \'{formatted_partition_number}\' does not exist for topic \'{topic_name}\'') + db[topic_name] = pickle.dumps({'partitions': partitions - {int(formatted_partition_number)}}) + db.close() + + shutil.rmtree(f'data/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/') + +def get_topics(): + ''' + Returns all topics. + + :return: The topics + :rtype: dict + ''' + with dbm.open('data/conf/topics', 'c') as db: + topics = {k.decode(): pickle.loads(v) for k, v in db.items()} + db.close() + return topics + +def _create_base_segment_files(topics: dict): + for topic, v in topics.items(): + partitions = format_topic_partitions(topic, v['partitions']) + for p in partitions: + utils.create_file_if_not_exists(f'data/topics/{p}/' + '0' * 16 + '.log') + utils.create_file_if_not_exists(f'data/topics/{p}/' + '0' * 16 + '.index') + utils.create_file_if_not_exists(f'data/topics/{p}/' + '0' * 16 + '.timeindex') + +def get_segment_base_timestamp(topic: str, segment: int) -> int: + ''' + Returns the base timestamp of a segment. + + :param topic: The topic + :type topic: str + :param segment: The segment base offset + :type segment: int + + :return: The base timestamp (milliseconds since 1970) + :rtype: int + ''' + with open(f'data/topics/{topic}/{segment:016x}.log', 'rb') as logfile: + # go to the topic length + logfile.seek(20) + + # skip topic + topic_length = int.from_bytes(logfile.read(2)) + logfile.seek(topic_length + 1, os.SEEK_CUR) + + timestamp_bytes = logfile.read(8) + if len(timestamp_bytes) < 8: + timestamp = None + else: + timestamp = int.from_bytes(timestamp_bytes) + + return timestamp + +def get_log_end_offsets(topics: dict): + _topics = topics + topics = [f'{t}{TOPIC_PARTITION_SEPARATOR}{p:02d}' for t, v in _topics.items() for p in v['partitions']] + last_topics_segments = {t: sorted([s for s in os.listdir(f'data/topics/{t}') if s.endswith('.log') and utils.is_number(s.split('.')[0], 16) and len(s.split('.')[0]) == 16])[-1] for t in topics} + topic_segments = {t: Topic(t).segments for t in topics} + for topic, segments in topic_segments.items(): + log_end_offset = 0 + for segment in segments: + with open(f'data/topics/{topic}/{segment.base_offset:016x}.log', 'rb') as logfile: + logfile.seek(0, os.SEEK_END) + file_size = logfile.tell() + logfile.seek(0) + + while True: + record_size_bytes = logfile.read(8) + if not record_size_bytes or len(record_size_bytes) < 8: + break + record_size = int.from_bytes(record_size_bytes) + if logfile.tell() + record_size > file_size: + break + logfile.seek(record_size, os.SEEK_CUR) + log_end_offset += 1 + logfile.close() + log_end_offsets[topic] = log_end_offset + +def init(config: dict = {'segment_max_size': 1024 * 256}): + ''' + Initializes the module. Calling this function is required before using most of the functions of the module. + + :param config: The configuration + :type config: dict + ''' + global SEGMENT_MAX_SIZE + SEGMENT_MAX_SIZE = config.get('segment_max_size', SEGMENT_MAX_SIZE) + + topics = get_topics() + # segments = {t: sorted([int(s.split('.')[0], 16) for s in os.listdir(f'data/topics/{t}') if s.endswith('.log') and utils.is_number(s.split('.')[0], 16) and len(s.split('.')[0]) == 16]) for t in [f'{t}{TOPIC_PARTITION_SEPARATOR}{p:02d}' for t, v in topics.items() for p in v['partitions']]} + + # base_timestamps = {t: {s: Segment(t, s).base_timestamp for s in ss} for t, ss in segments.items()} + + _create_base_segment_files(topics) + get_log_end_offsets(topics) + +def get_next_lower_index_entry(offset: int): + return utils.get_next_lower_integer_multiple(offset, 1024) + +def validate_topic_partition(partition_number: str | int) -> bool: + ''' + Validates a topic partition number format. + + :param partition_number: The partition number + :type partition_number: str | int + + :return: ``True``, if the partition number is valid, otherwise ``False`` + :rtype: bool + ''' + if isinstance(partition_number, str): + if len(partition_number) != 2 or not partition_number.isdigit(): + return False + partition_number = int(partition_number) + + elif not isinstance(partition_number, int): + return False + + if not (0 < partition_number < 100): + return False + + return True + +def validate_topic_name(topic_name: str) -> bool: + ''' + Validates a topic name. + + :param topic_name: The topic name + :type topic_name: str + + :return: ``True``, if the topic name is valid, otherwise ``False`` + :rtype: bool + ''' + if not isinstance(topic_name, str): + return False + + if not _TOPIC_NAME_REGEX.match(topic_name): + return False + + if len(topic_name) == 0 or len(topic_name) > 255: + return False + + if topic_name.startswith('.') or topic_name.endswith('.') or topic_name.startswith('..') or topic_name.endswith('..'): + return False + +def validate_topic_format(topic: str): + if topic.count(TOPIC_PARTITION_SEPARATOR) != 1: + raise ValueError('invalid topic format') + _, formatted_partition_number = topic.split(TOPIC_PARTITION_SEPARATOR) + if len(formatted_partition_number) != 2 or not formatted_partition_number.isdigit(): + raise ValueError('invalid partition number format') + +def check_topic_exists(topic: str): + ''' + Checks whether a topic exists. + + :param topic: The topic name + :type topic: str + + :raises TopicNotFoundError: If the topic does not exist + :raises TopicPartitionNotFoundError: If the partition does not exist + ''' + topics = get_topics() + topic_name, formatted_partition_number = topic.split(TOPIC_PARTITION_SEPARATOR) + raw_partition_number = int(formatted_partition_number) + if topic_name not in topics: + raise exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist') + if raw_partition_number not in topics[topic_name]['partitions']: + raise exceptions.TopicPartitionNotFoundError(f'partition \'{formatted_partition_number}\' does not exist for topic \'{topic_name}\'') + +def pack_record_headers(headers: dict) -> bytes: + ''' + Packs the record header dictionary into bytes. + + :param headers: The header dictionary + :type headers: dict + + :return: The bytes + :rtype: bytes + ''' + packed = b'' + headers = {(k.encode() if isinstance(k, str) else k): (v.encode() if isinstance(v, str) else v) for k, v in headers.items()} + for k, v in headers.items(): + packed += len(k).to_bytes(4) + packed += k + packed += len(v).to_bytes(4) + packed += v + return packed + +def _unpack_record_headers(buffer: io.BytesIO, number: int = 256): + headers = {} + for _ in range(number): + key_length_bytes = buffer.read(4) + if len(key_length_bytes) < 4: + break + key_length = int.from_bytes(key_length_bytes) + + key = buffer.read(key_length) + if len(key) < key_length: + break + + value_length_bytes = buffer.read(4) + if len(value_length_bytes) < 4: + break + value_length = int.from_bytes(value_length_bytes) + + value = buffer.read(value_length) + if len(value) < value_length: + break + + headers[key] = value + return headers + +def unpack_record_headers(packed: bytes) -> dict: + ''' + Unpacks the record headers from bytes. + + :param packed: The packed headers + :type packed: bytes + + :return: The header dictionary + :rtype: dict + ''' + buffer = io.BytesIO(packed) + return _unpack_record_headers(buffer) + +def create_record(topic: str, timestamp: int, record_data: bytes, key: bytes = b'', compression_type: int = 0, headers: dict = {}): + ''' + Creates a record. + + :param topic: The topic name + :type topic: str + :param timestamp: The timestamp. + :type timestamp: int + :param record_data: The payload + :type record_data: bytes + :param key: The key + :type key: bytes + :param compression_type: The compression type + :type compression_type: int + :param headers: The header dictionary + :type headers: dict + ''' + global log_end_offsets + validate_topic_format(topic) + topic_name, formatted_partition_number = topic.split(TOPIC_PARTITION_SEPARATOR) + + check_topic_exists(topic) + + topic_dir = f'data/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/' + utils.create_file_if_not_exists(topic_dir + '0' * 16 + '.log') + segments = sorted([int(f.split('.')[0], 16) for f in os.listdir(topic_dir) if f.endswith('.log') and utils.is_number(f.split('.')[0], 16) and len(f.split('.')[0]) == 16]) + last_segment = max(segments) + + if timestamp == 0: + timestamp = int(time.time() * 1000) + timestamp_type = 1 + else: + timestamp_type = 0 + + # Assemble record + + record = b'' + record += log_end_offsets[topic].to_bytes(8) # Offset + record += binascii.crc32(record_data).to_bytes(4) # CRC + record += (len(topic)).to_bytes(2) # Topic name length + record += topic.encode() # Topic + record += AttributesByte(compression_type, timestamp_type).to_byte() # Attributes + record += timestamp.to_bytes(8) # Timestamp + record += (len(key)).to_bytes(4) # Key Length + record += key # Key + record += (len(record_data)).to_bytes(4) # Content Length + record += record_data # Content + record += (len(headers)).to_bytes(1) + for key, value in headers.items(): + record += len(key).to_bytes(4) + record += key + record += len(value).to_bytes(4) + record += value + + + record = len(record).to_bytes(8) + record # Prepend Record Size + + if os.path.getsize(topic_dir + f'{last_segment:016x}.log') + len(record) > SEGMENT_MAX_SIZE: + last_segment = log_end_offsets[topic] + + with open(topic_dir + f'{last_segment:016x}.log', 'ab') as logfile: + logfile.write(record) + logfile.close() + + utils.create_file_if_not_exists(topic_dir + f'{last_segment:016x}.index') + utils.create_file_if_not_exists(topic_dir + f'{last_segment:016x}.timeindex') + + if log_end_offsets[topic] % 1024 == 0: + index = log_end_offsets[topic].to_bytes(8) + os.path.getsize(topic_dir + f'{last_segment:016x}.log').to_bytes(4) + timeindex = timestamp.to_bytes(8) + log_end_offsets[topic].to_bytes(8) + with open(topic_dir + f'{last_segment:016x}.index', 'ab') as idxfile: + idxfile.write(index) + idxfile.close() + with open(topic_dir + f'{last_segment:016x}.timeindex', 'ab') as timeidxfile: + timeidxfile.write(timeindex) + timeidxfile.close() + + log_end_offsets[topic] += 1 + +async def fetch_records(topic: str, start_type: int, start: int, max_bytes: int, send_block_function: FunctionType | MethodType): + ''' + Fetches records from the topic and sends it using ``send_block_function`` + + :param topic: The topic name + :type topic: str + :param start_type: The start type + :type start_type: int + :param start: The start + :type start: int + :param max_bytes: The maximum of bytes + :type max_bytes: int + :param send_block_function: The function to send the chunks + :type send_block_function: FunctionType | MethodType + ''' + topic = Topic(topic) + topics = get_topics() + validate_topic_format(topic.topic_name) + topic_name, formatted_partition_number = topic.topic_name.split(TOPIC_PARTITION_SEPARATOR) + raw_partition_number = int(formatted_partition_number) + if start_type not in (FETCH_START_TYPE_OFFSET, FETCH_START_TYPE_TIMESTAMP): + raise ValueError('invalid start type') + check_topic_exists(topic.topic_name) + segments = [int(f.split('.')[0], 16) for f in os.listdir(f'data/topics/{topic.topic_name}/') if utils.is_number(f.split('.')[0], 16) and len(f.split('.')[0]) == 16 and f.endswith('.log')] + + if start_type == FETCH_START_TYPE_TIMESTAMP: + segment_base_timestamps = {s: s.base_timestamp for s in topic.segments} + s = {v: k for k, v in segment_base_timestamps.items()}[utils.find_nearest_lower_number(list(segment_base_timestamps.values()), start) or min(segment_base_timestamps.values())] + with open(f'data/topics/{topic.topic_name}/{s.base_offset:016x}.timeindex', 'rb') as f: + timestamps = {} + f.seek(0) + while True: + timestamp_bytes = f.read(8) + if len(timestamp_bytes) < 8: + break + timestamp = int.from_bytes(timestamp_bytes) + + offset_bytes = f.read(8) + if len(offset_bytes) < 8: + break + offset = int.from_bytes(offset_bytes) + + timestamps[timestamp] = offset + f.close() + start = timestamps[utils.find_nearest_lower_number(timestamps.keys(), start) or min(timestamps.keys())] + + + if start < 0 or start >= log_end_offsets[topic.topic_name]: + await send_block_function(b'\xb1\x31') + return + idx_offset = get_next_lower_index_entry(start) + + + start_seg = utils.find_nearest_lower_number(segments, start) + + + with open(f'data/topics/{topic.topic_name}/' + f'{start_seg:016x}' + '.index', 'rb') as idxfile: + idxfile.seek(0, os.SEEK_END) + file_size = idxfile.tell() + idxfile.seek(idx_offset // 1024 * 12) + index_entry = idxfile.read(12) + idx_entry_pos = int.from_bytes(index_entry[8:]) + if int.from_bytes(index_entry[:8]) != idx_offset: + warnings.warn(f'Index entry offset mismatch: expected {idx_offset}, got {int.from_bytes(index_entry[:8])}', FileCorruptWarning) + idxfile.close() + + with open(f'data/topics/{topic.topic_name}/{start_seg:016x}.log', 'rb') as logfile: + logfile.seek(idx_entry_pos) + offset = idx_offset + record_position = 0 + while offset < start: + record_length_bytes = logfile.read(8) + if len(record_length_bytes) < 8: + break + record_length = int.from_bytes(record_length_bytes) + + offset_bytes = logfile.read(8) + if len(offset_bytes) < 8: + break + offset = int.from_bytes(offset_bytes) + + record_position = logfile.tell() - 16 + logfile.seek(record_length - 8, os.SEEK_CUR) + logfile.close() + + + + await send_block_function(b'\xa0\x30') + + segments = sorted(segments) + + bytes_fetched = 0 + print(max_bytes) + for i, segment in enumerate(segments[segments.index(start_seg):]): + records_sent = 0 + with open(f'data/topics/{topic.topic_name}/' + f'{segment:016x}' + '.log', 'rb') as logfile: + if i == 0: + logfile.seek(record_position) + else: + logfile.seek(0) + while bytes_fetched <= max_bytes: + record_size_bytes = logfile.read(8) + if len(record_size_bytes) < 8: + break + record_size = int.from_bytes(record_size_bytes) + if bytes_fetched + 8 + record_size > max_bytes: + break + + to_send = record_size_bytes + + for _ in range(record_size // FETCH_RECORDS_CHUNK_SIZE): + to_send += logfile.read(FETCH_RECORDS_CHUNK_SIZE) + await send_block_function(b'\x01' + to_send) + to_send = b'' + to_send += logfile.read(record_size % FETCH_RECORDS_CHUNK_SIZE) + await send_block_function(b'\x01' + to_send) + + bytes_fetched += 8 + record_size + records_sent += 1 + + logfile.close() + + + + if bytes_fetched >= max_bytes: + break + + + + + + await send_block_function(b'\x00') # EOF + +class Topic: + def __init__(self, topic_name: str): + self.topic_name = topic_name + + def __repr__(self): + return f'Topic({self.topic_name})' + + @property + def segments(self): + return [Segment(self.topic_name, bo) for bo in sorted([int(s.split('.')[0], 16) for s in os.listdir(f'data/topics/{self.topic_name}') if s.endswith('.log') and utils.is_number(s.split('.')[0], 16) and len(s.split('.')[0]) == 16])] + +class Segment: + def __init__(self, topic: str, base_offset: int): + self.topic = topic + self.base_offset = base_offset + self.base_timestamp = self.get_base_timestamp() + + def __repr__(self): + return f'Segment({self.topic}, {self.base_offset})' + + def get_base_timestamp(self): + with open(f'data/topics/{self.topic}/{self.base_offset:016x}.timeindex', 'rb') as logfile: + timestamp_bytes = logfile.read(8) + if len(timestamp_bytes) < 8: + timestamp = None + else: + timestamp = int.from_bytes(timestamp_bytes) + + return timestamp + +class FileCorruptWarning(UserWarning): ... + +class AttributesByte: + def __init__(self, compression_type: int, timestamp_type: int): + self.compression_type = compression_type + self.timestamp_type = timestamp_type + + def to_byte(self): + return utils.bits_to_byte( + 0, 0, 0, 0, # Unused bits + self.timestamp_type, + (self.compression_type >> 2) & 1, + (self.compression_type >> 1) & 1, + self.compression_type & 1, + ) + + @classmethod + def from_byte(cls, b: bytes | int): + if isinstance(b, int): + b = bytes([b]) + + bits = utils.byte_to_bits(b) + compression_type = utils.bits_to_byte(*bits[-3:])[0] + if compression_type > 4: + raise ValueError('invalid compression type in attributes byte') + timestamp_type = bits[-4] + if timestamp_type > 1: + raise ValueError('invalid timestamp type in attributes byte') + return cls(compression_type, timestamp_type) \ No newline at end of file diff --git a/src/jeb_utils/jebp_utils.py b/src/jeb_utils/jebp_utils.py new file mode 100644 index 0000000..4c1253a --- /dev/null +++ b/src/jeb_utils/jebp_utils.py @@ -0,0 +1,116 @@ +import asyncio +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +import io +from . import utils +from typing import Sequence + +__all__ = [ + 'MessageFormatError', + 'sendmsg', + 'readmsg', + 'pack_fields', + 'unpack_fields', +] + +_MSG_START_BYTE = b'\x01' + +class MessageFormatError(Exception): ... + +async def sendmsg(m: bytes, writer: asyncio.StreamWriter, aesgcm: AESGCM = None, aesnonce: bytes = None, send_headers: bool = True, _content_length: int = None, start_byte: bytes = _MSG_START_BYTE): + ''' + Sends a message. + + :param m: The message + :type m: bytes + :param writer: The writer + :type writer: asyncio.StreamWriter + :param aesgcm: AESGCM + :type aesgcm: AESGCM + :param aesnonce: The nonce + :type aesnonce: bytes + :param send_headers: Controls whether the protocol headers are sent. + :type send_headers: bool + :param _content_length: The content length. If ``None``, the content length will be calculated automatically. + :type _content_length: int + :param start_byte: The start byte. + :type start_byte: bytes + ''' + if type(m) == str: + m = m.encode() + if aesgcm: + m = aesgcm.encrypt(aesnonce, m, None) + if _content_length == None: + _content_length = len(m) + if send_headers: + content_length = utils.int_to_bytes(_content_length) + content_length = bytes([len(content_length)]) + content_length + else: + content_length = b'' + writer.write(start_byte + content_length + m) + await writer.drain() + +async def readmsg(reader: asyncio.StreamReader, aesgcm: AESGCM = None, aesnonce: bytes = None, start_byte: bytes = _MSG_START_BYTE) -> bytes: + ''' + Receives a message. + + :param reader: The reader + :type reader: asyncio.StreamReader + :param aesgcm: AESGCM + :type aesgcm: AESGCM + :param aesnonce: The nonce + :type aesnonce: bytes + :param start_byte: The start byte + :type start_byte: bytes + + :raises MessageFormatError: If the message format is invalid. + + :return: The received message + :rtype: bytes + ''' + if await reader.readexactly(1) != start_byte: + raise MessageFormatError('invalid message format') + content_length_length = await reader.readexactly(1) + content_length = await reader.readexactly(int.from_bytes(content_length_length)) + m = await reader.readexactly(int.from_bytes(content_length)) + if aesgcm: + return aesgcm.decrypt(aesnonce, m, None) + return m + +def pack_fields(*fields: bytes) -> bytes: + ''' + Packs the fields into a compact bytestring. + + :param fields: The fields + :type fields: bytes + + :return: The result + :rtype: bytes + ''' + result = b'' + for field in fields: + field_length = utils.int_to_bytes(len(field)) + field_length_length = utils.int_to_bytes(len(field_length)) + result += field_length_length + field_length + field + return result + +def unpack_fields(packed: bytes) -> Sequence[bytes]: + ''' + Unpacks the field from a bytestring. + + :param packed: The packed fields + :type packed: bytes + + :return: The unpacked fields + :rtype: Sequence[bytes] + ''' + packed = [] + buffer = io.BytesIO(packed) + while buffer.tell() < len(packed): + try: + field_length_length = int.from_bytes(buffer.read(1)) + field_length = int.from_bytes(buffer.read(field_length_length)) + field = buffer.read(field_length) + packed += (field,) + except: + pass + return packed \ No newline at end of file diff --git a/src/jeb_utils/utils.py b/src/jeb_utils/utils.py new file mode 100644 index 0000000..2b805af --- /dev/null +++ b/src/jeb_utils/utils.py @@ -0,0 +1,142 @@ +import os +from typing import Tuple, Literal, Sequence + +__all__ = [ + 'is_number', + 'int_to_bytes', + 'bits_to_byte', + 'byte_to_bits', + 'get_next_lower_integer_multiple', + 'find_nearest_lower_number', + 'create_file_if_not_exists', +] + +def is_number(string: str, base: int = 10) -> bool: + ''' + Checks whether the string is a number. + + :param string: The string + :type string: str + :param base: The base + :type base: int + + :return: ``True`` if the string is a number, otherwise ``False`` + :rtype: bool + ''' + if not string: + return False + try: + int(string, base) + return True + except ValueError: + return False + +def int_to_bytes(n: int, signed: bool = False) -> bytes: + ''' + Converts the integer into bytes (dynamic length). + + :param n: The integer + :type n: int + :param signed: Whether the integer is signed or not. + :type signed: bool + + :return: The bytes + :rtype: bytes + ''' + n = int(n) + length = (n.bit_length() + 7) // 8 + if length == 0: + length = 1 + return n.to_bytes(length, signed=signed) + +def _bits_to_byte(bits: Tuple[Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1]]) -> bytes: + if len(bits) != 8: + raise ValueError('bits must be a tuple of 8 bits') + byte = 0 + for bit in bits: + if int(bit) not in (0, 1): + raise ValueError('each bit must be 0 or 1') + byte = (byte << 1) | int(bit) + return byte + +def bits_to_byte(*bits: Literal[0, 1]) -> bytes: + ''' + Converts the bits into a single byte. + + :param bits: The bits + :type bits: int + + :raises ValueError: If there are more than 8 bits specified or a bit is neither ``0`` nor ``1``. + + :return: The byte + :rtype: bytes + ''' + if len(bits) > 8: + raise ValueError('a maximum of 8 bits can be converted to a byte') + while len(bits) < 8: + bits = (0,) + bits + return bytes([_bits_to_byte(bits)]) + +def byte_to_bits(b: bytes) -> Tuple[Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1]]: + ''' + Converts the byte to a tuple of bits. + + :param b: The byte + :type b: bytes + + :raises ValueError: If more than one byte is specified or ``b`` is neither a bytestring nor a bytearray, + + :return: The bits + :rtype: tuple + ''' + if not isinstance(b, (bytes, bytearray)) or len(b) != 1: + raise ValueError('input must be a single byte') + byte = b[0] + return tuple((byte >> i) & 1 for i in reversed(range(8))) + +def get_next_lower_integer_multiple(value: int, multiple: int) -> int: + ''' + Returns the next lower integer multiple. + + :param value: The value + :type value: int + :param multiple: The factor + :type multiple: int + + :return: The next lower integer multiple + :rtype: int + ''' + return value - (value % multiple) + +def find_nearest_lower_number(number_list: Sequence, target: int | float) -> int | float: + ''' + Finds the nearest lower number to the target from the list. + + :param number_list: The list + :type number_list: Sequence + :param target: The target + :type target: int | float + + :return: The nearest lower number to the target from the list. + :rtype: int | float + ''' + nearest = None + for number in number_list: + if number <= target: + if nearest is None or number > nearest: + nearest = number + return nearest + +def create_file_if_not_exists(path: str, content: bytes = b''): + ''' + Creates a file if it does not exist. + + :param path: The file path + :type path: str + :param content: Optional, the content of the file if it is newly created. + :type content: bytes + ''' + if not os.path.exists(path): + with open(path, 'wb') as f: + f.write(content) + f.close() \ No newline at end of file