# Copyright 2026 jCloud Services GbR # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from jeb_utils import exceptions, jeb_utils, utils import os import dbm import config_parser import shutil import time import binascii import warnings __all__ = [ 'JEBServerUtils', 'Segment', 'Topic', ] def _create_file_if_not_exists(path: str, content: bytes = b''): ''' Creates a file if it does not exist. :param path: The file path :type path: str :param content: Optional, the content of the file if it is newly created. :type content: bytes ''' if not os.path.exists(path): with open(path, 'wb') as f: f.write(content) f.close() class JEBServerUtils: def __init__(self, data_dir: str, *, segment_max_size: int = 2 ** 32, fetch_records_chunk_size: int = 1024 ** 2): ''' The utilities for the jeb server. ''' self.data_dir = data_dir self.segment_max_size = segment_max_size self.fetch_records_chunk_size = fetch_records_chunk_size self._log_end_offsets = {} self._init() def _init(self): self._create_base_segment_files() self._init_log_end_offsets() def _create_base_segment_files(self): ''' Creates the base segment files. ''' for topic, v in self.get_topics().items(): partitions = jeb_utils.format_topic_partitions(topic, v['partitions']) for p in partitions: _create_file_if_not_exists(f'{self.data_dir}/topics/{p}/' + '0' * 16 + '.log') _create_file_if_not_exists(f'{self.data_dir}/topics/{p}/' + '0' * 16 + '.index') _create_file_if_not_exists(f'{self.data_dir}/topics/{p}/' + '0' * 16 + '.timeindex') def _init_log_end_offsets(self): ''' Initializes the log end offsets. ''' global _log_end_offsets topics = [f'{t}{jeb_utils.TOPIC_PARTITION_SEPARATOR}{p:02d}' for t, v in self.get_topics().items() for p in v['partitions']] topic_segments = {t: Topic(t, self).segments for t in topics} for topic, segments in topic_segments.items(): log_end_offset = 0 for segment in segments: with open(f'{self.data_dir}/topics/{topic}/{segment.base_offset:016x}.log', 'rb') as logfile: logfile.seek(0, os.SEEK_END) file_size = logfile.tell() logfile.seek(0) while True: record_size_bytes = logfile.read(8) if not record_size_bytes or len(record_size_bytes) < 8: break record_size = int.from_bytes(record_size_bytes) if logfile.tell() + record_size > file_size: break logfile.seek(record_size, os.SEEK_CUR) log_end_offset += 1 logfile.close() self._log_end_offsets[topic] = log_end_offset def get_segment_base_timestamp(self, topic: str, segment: int) -> int: ''' Returns the base timestamp of a segment. :param topic: The topic :type topic: str :param segment: The segment base offset :type segment: int :return: The base timestamp (milliseconds since 1970) :rtype: int ''' with open(f'{self.data_dir}/topics/{topic}/{segment:016x}.log', 'rb') as logfile: # go to the topic length logfile.seek(20) # skip topic topic_length = int.from_bytes(logfile.read(2)) logfile.seek(topic_length + 1, os.SEEK_CUR) timestamp_bytes = logfile.read(8) if len(timestamp_bytes) < 8: timestamp = None else: timestamp = int.from_bytes(timestamp_bytes) return timestamp def get_topics(self): ''' Returns all topics. :return: The topics :rtype: dict ''' with dbm.open(f'{self.data_dir}/conf/topics', 'c') as db: topics = {k.decode(): config_parser.parse.json.parse_json(v.decode()) for k, v in db.items()} db.close() return topics def mktopic(self, topic_name: str): ''' Creates a topic. :param topic_name: The name of the topic :type topic_name: str ''' with dbm.open(f'{self.data_dir}/conf/topics', 'c') as db: if topic_name.encode() in db.keys(): raise exceptions.TopicExistsError(f'topic \'{topic_name}\' already exists') db[topic_name] = config_parser.serialize.json.serialize({'partitions': [1]}) db.close() os.mkdir(f'{self.data_dir}/topics/{topic_name}{jeb_utils.TOPIC_PARTITION_SEPARATOR}01/') def rmtopic(self, topic_name: str): ''' Removes a topic. :param topic_name: The topic name :type topic_name: str :raises TopicNotFoundError: If the topic does not exist. ''' with dbm.open(f'{self.data_dir}/conf/topics', 'c') as db: if topic_name.encode() not in db.keys(): raise exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist') partitions = config_parser.parse.json.parse_json(db[topic_name.encode()].decode())['partitions'] del db[topic_name] db.close() partition_dirs = jeb_utils.format_topic_partitions(topic_name, partitions) for pd in partition_dirs: try: shutil.rmtree(f'{self.data_dir}/topics/{pd}/') except: pass def mktopicpart(self, topic_name: str, partition_number: int): ''' Creates a partition of a topic. :param topic_name: The topic name :type topic_name: str :param partition_number: The partition number :type partition_number: int ''' if partition_number < 1 or partition_number > 99: raise ValueError('partition number must be between 1 and 99') formatted_partition_number = f'{(int(partition_number)):02d}' with dbm.open(f'{self.data_dir}/conf/topics', 'c') as db: if topic_name.encode() not in db.keys(): raise exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist') partitions = config_parser.parse.json.parse_json(db[topic_name.encode()].decode())['partitions'] if partition_number in partitions: raise exceptions.TopicPartitionExistsError(f'partition \'{formatted_partition_number}\' already exists for topic \'{topic_name}\'') db[topic_name] = config_parser.serialize.json.serialize({'partitions': sorted(list(set(partitions) | {int(formatted_partition_number)}))}) db.close() os.mkdir(f'{self.data_dir}/topics/{topic_name}{jeb_utils.TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/') def rmtopicpart(self, topic_name: str, partition_number: int): ''' Removes a partition of a topic. :param topic_name: The topic name :type topic_name: str :param partition_number: The partition number :type partition_number: int ''' formatted_partition_number = f'{(int(partition_number)):02d}' with dbm.open(f'{self.data_dir}/conf/topics', 'c') as db: if topic_name.encode() not in db.keys(): raise exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist') partitions = config_parser.parse.json.parse_json(db[topic_name.encode()].decode())['partitions'] if partition_number not in partitions: raise exceptions.TopicPartitionNotFoundError(f'partition \'{formatted_partition_number}\' does not exist for topic \'{topic_name}\'') db[topic_name] = config_parser.serialize.json.serialize({'partitions': sorted(list(set(partitions) - {int(formatted_partition_number)}))}) db.close() shutil.rmtree(f'{self.data_dir}/topics/{topic_name}{jeb_utils.TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/') def check_topic_exists(self, topic: str): ''' Checks whether a topic exists. :param topic: The topic name :type topic: str :raises TopicNotFoundError: If the topic does not exist :raises TopicPartitionNotFoundError: If the partition does not exist ''' topics = self.get_topics() topic_name, formatted_partition_number = topic.split(jeb_utils.TOPIC_PARTITION_SEPARATOR) raw_partition_number = int(formatted_partition_number) if topic_name not in topics: raise exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist') if raw_partition_number not in topics[topic_name]['partitions']: raise exceptions.TopicPartitionNotFoundError(f'partition \'{formatted_partition_number}\' does not exist for topic \'{topic_name}\'') def create_record(self, topic: str, timestamp: int, record_data: bytes, key: bytes = b'', compression_type: int = 0, headers: dict = {}): ''' Creates a record. :param topic: The topic name :type topic: str :param timestamp: The timestamp. :type timestamp: int :param record_data: The payload :type record_data: bytes :param key: The key :type key: bytes :param compression_type: The compression type :type compression_type: int :param headers: The header dictionary :type headers: dict ''' topic_name, formatted_partition_number = topic.split(jeb_utils.TOPIC_PARTITION_SEPARATOR) if not jeb_utils.validate_topic_name(topic_name): raise exceptions.FormatError(f'invalid topic name: \'{topic}\'') self.check_topic_exists(topic) topic_dir = f'{self.data_dir}/topics/{topic_name}{jeb_utils.TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/' utils.create_file_if_not_exists(topic_dir + '0' * 16 + '.log') segments = sorted([int(f.split('.')[0], 16) for f in os.listdir(topic_dir) if f.endswith('.log') and utils.is_number(f.split('.')[0], 16) and len(f.split('.')[0]) == 16]) last_segment = max(segments) if timestamp == 0: timestamp = int(time.time() * 1000) timestamp_type = 1 else: timestamp_type = 0 # Assemble record record = b'' record += self._log_end_offsets[topic].to_bytes(8) # Offset record += binascii.crc32(record_data).to_bytes(4) # CRC record += (len(topic)).to_bytes(2) # Topic name length record += topic.encode() # Topic record += jeb_utils.AttributesByte(compression_type, timestamp_type).to_byte() # Attributes record += timestamp.to_bytes(8) # Timestamp record += (len(key)).to_bytes(4) # Key Length record += key # Key record += (len(record_data)).to_bytes(4) # Content Length record += record_data # Content record += (len(headers)).to_bytes(1) for key, value in headers.items(): record += len(key).to_bytes(4) record += key record += len(value).to_bytes(4) record += value record = len(record).to_bytes(8) + record # Prepend Record Size if os.path.getsize(topic_dir + f'{last_segment:016x}.log') + len(record) > self.segment_max_size: last_segment = self._log_end_offsets[topic] with open(topic_dir + f'{last_segment:016x}.log', 'ab') as logfile: logfile.write(record) logfile.close() utils.create_file_if_not_exists(topic_dir + f'{last_segment:016x}.index') utils.create_file_if_not_exists(topic_dir + f'{last_segment:016x}.timeindex') if self._log_end_offsets[topic] % 1024 == 0: index = self._log_end_offsets[topic].to_bytes(8) + os.path.getsize(topic_dir + f'{last_segment:016x}.log').to_bytes(4) timeindex = timestamp.to_bytes(8) + self._log_end_offsets[topic].to_bytes(8) with open(topic_dir + f'{last_segment:016x}.index', 'ab') as idxfile: idxfile.write(index) idxfile.close() with open(topic_dir + f'{last_segment:016x}.timeindex', 'ab') as timeidxfile: timeidxfile.write(timeindex) timeidxfile.close() self._log_end_offsets[topic] += 1 def fetch_records(self, topic: str, start_type: int, start: int, max_bytes: int): ''' Fetches records from the topic and yields them in chunks. :param topic: The topic name :type topic: str :param start_type: The start type :type start_type: int :param start: The start :type start: int :param max_bytes: The maximum of bytes :type max_bytes: int ''' topic = Topic(topic, self) topic_name, formatted_partition_number = topic.topic_name.split(jeb_utils.TOPIC_PARTITION_SEPARATOR) if not jeb_utils.validate_topic_name(topic_name): raise exceptions.FormatError(f'invalid topic name: \'{topic.topic_name}\'') if start_type not in (jeb_utils.FETCH_START_TYPE_OFFSET, jeb_utils.FETCH_START_TYPE_TIMESTAMP): raise ValueError('invalid start type') self.check_topic_exists(topic.topic_name) segments = [int(f.split('.')[0], 16) for f in os.listdir(f'{self.data_dir}/topics/{topic.topic_name}/') if utils.is_number(f.split('.')[0], 16) and len(f.split('.')[0]) == 16 and f.endswith('.log')] if start_type == jeb_utils.FETCH_START_TYPE_TIMESTAMP: segment_base_timestamps = {s: s.base_timestamp for s in topic.segments} s = {v: k for k, v in segment_base_timestamps.items()}[utils.find_nearest_lower_number(list(segment_base_timestamps.values()), start) or min(segment_base_timestamps.values())] with open(f'{self.data_dir}/topics/{topic.topic_name}/{s.base_offset:016x}.timeindex', 'rb') as f: timestamps = {} f.seek(0) while True: timestamp_bytes = f.read(8) if len(timestamp_bytes) < 8: break timestamp = int.from_bytes(timestamp_bytes) offset_bytes = f.read(8) if len(offset_bytes) < 8: break offset = int.from_bytes(offset_bytes) timestamps[timestamp] = offset f.close() start = timestamps[utils.find_nearest_lower_number(timestamps.keys(), start) or min(timestamps.keys())] if start < 0 or start >= self._log_end_offsets[topic.topic_name]: # await send_block_function(b'\xb1\x31') yield b'\xb1\x31' return idx_offset = jeb_utils.get_next_lower_index_entry(start) start_seg = utils.find_nearest_lower_number(segments, start) with open(f'{self.data_dir}/topics/{topic.topic_name}/' + f'{start_seg:016x}' + '.index', 'rb') as idxfile: idxfile.seek(0, os.SEEK_END) file_size = idxfile.tell() idxfile.seek(idx_offset // 1024 * 12) index_entry = idxfile.read(12) idx_entry_pos = int.from_bytes(index_entry[8:]) if int.from_bytes(index_entry[:8]) != idx_offset: warnings.warn(f'Index entry offset mismatch: expected {idx_offset}, got {int.from_bytes(index_entry[:8])}', jeb_utils.FileCorruptWarning) idxfile.close() with open(f'{self.data_dir}/topics/{topic.topic_name}/{start_seg:016x}.log', 'rb') as logfile: logfile.seek(idx_entry_pos) offset = idx_offset record_position = 0 while offset < start: record_length_bytes = logfile.read(8) if len(record_length_bytes) < 8: break record_length = int.from_bytes(record_length_bytes) offset_bytes = logfile.read(8) if len(offset_bytes) < 8: break offset = int.from_bytes(offset_bytes) record_position = logfile.tell() - 16 logfile.seek(record_length - 8, os.SEEK_CUR) logfile.close() # await send_block_function(b'\xa0\x30') yield b'\xa0\x30' segments = sorted(segments) bytes_fetched = 0 for i, segment in enumerate(segments[segments.index(start_seg):]): records_sent = 0 with open(f'{self.data_dir}/topics/{topic.topic_name}/' + f'{segment:016x}' + '.log', 'rb') as logfile: if i == 0: logfile.seek(record_position) else: logfile.seek(0) while bytes_fetched <= max_bytes: record_size_bytes = logfile.read(8) if len(record_size_bytes) < 8: break record_size = int.from_bytes(record_size_bytes) if bytes_fetched + 8 + record_size > max_bytes: break to_send = record_size_bytes for _ in range(record_size // self.fetch_records_chunk_size): to_send += logfile.read(self.fetch_records_chunk_size) # await send_block_function(b'\x01' + to_send) yield b'\x01' + to_send to_send = b'' to_send += logfile.read(record_size % self.fetch_records_chunk_size) # await send_block_function(b'\x01' + to_send) yield b'\x01' + to_send bytes_fetched += 8 + record_size records_sent += 1 logfile.close() if bytes_fetched >= max_bytes: break # await send_block_function(b'\x00') # EOF yield b'\x00' # EOF class Segment: def __init__(self, topic: str, base_offset: int, jeb_server_utils: JEBServerUtils): ''' Represents a segment of a topic. :param topic: The topic name :type topic: str :param base_offset: The base offset of the segment :type base_offset: int :param jeb_server_utils: The ``JEBServerUtils`` instance :type jeb_server_utils: JEBServerUtils ''' self.topic = topic self.base_offset = base_offset self.jeb_server_utils = jeb_server_utils self.base_timestamp = self.get_base_timestamp() def __repr__(self): return f'Segment({self.topic}, {self.base_offset})' def get_base_timestamp(self): ''' Returns the base timestamp of the segment. :return: The base timestamp (milliseconds since 1970) :rtype: int ''' # with open(f'{self.jeb_server_utils.data_dir}/topics/{self.topic}/{self.base_offset:016x}.timeindex', 'rb') as logfile: # timestamp_bytes = logfile.read(8) # if len(timestamp_bytes) < 8: # timestamp = None # else: # timestamp = int.from_bytes(timestamp_bytes) # return timestamp return self.jeb_server_utils.get_segment_base_timestamp(self.topic, self.base_offset) class Topic: def __init__(self, topic_name: str, jeb_server_utils: JEBServerUtils): ''' Represents a topic. :param topic_name: The name of the topic :type topic_name: str :param jeb_server_utils: The ``JEBServerUtils`` instance :type jeb_server_utils: JEBServerUtils ''' self.topic_name = topic_name self.jeb_server_utils = jeb_server_utils def __repr__(self): return f'Topic({self.topic_name})' @property def segments(self): ''' Returns the segments of the topic. :return: The segments :rtype: list ''' return [Segment(self.topic_name, bo, self.jeb_server_utils) for bo in sorted([int(s.split('.')[0], 16) for s in os.listdir(f'{self.jeb_server_utils.data_dir}/topics/{self.topic_name}') if s.endswith('.log') and utils.is_number(s.split('.')[0], 16) and len(s.split('.')[0]) == 16])]