geändert: README.md

geändert:       pyproject.toml
	geändert:       src/jeb_utils/jeb_utils.py
This commit is contained in:
2026-02-19 17:21:17 +01:00
parent a0036b6767
commit 72de06e531
3 changed files with 32 additions and 27 deletions
+3
View File
@@ -77,6 +77,9 @@ pip install jeb-utils --index-url https://jcloud-services.ddns.net/simple/ --ext
- `create_file_if_not_exists`: Creates a file if it does not exist. - `create_file_if_not_exists`: Creates a file if it does not exist.
## Changelog ## Changelog
### Version 0.1.1
- Support for custom data paths (`jeb_utils.init`)
### Version 0.1.0 ### Version 0.1.0
- First release - First release
- `auth_utils` - `auth_utils`
+1 -1
View File
@@ -4,6 +4,6 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "jeb-utils" name = "jeb-utils"
version = "0.1.0" version = "0.1.1"
description = "Common utils for JEB client and server." description = "Common utils for JEB client and server."
dependencies = ["cryptography"] dependencies = ["cryptography"]
+28 -26
View File
@@ -57,6 +57,7 @@ FETCH_START_TYPE_OFFSET = 0xc0
FETCH_START_TYPE_TIMESTAMP = 0xc1 FETCH_START_TYPE_TIMESTAMP = 0xc1
FETCH_RECORDS_CHUNK_SIZE = 1024 ** 2 # 1 MiB FETCH_RECORDS_CHUNK_SIZE = 1024 ** 2 # 1 MiB
SEGMENT_MAX_SIZE = None SEGMENT_MAX_SIZE = None
DATA_DIR = None
log_end_offsets = {} log_end_offsets = {}
@@ -81,13 +82,13 @@ def mktopic(topic_name: str):
:param topic_name: The name of the topic :param topic_name: The name of the topic
:type topic_name: str :type topic_name: str
''' '''
with dbm.open('data/conf/topics', 'c') as db: with dbm.open(f'{DATA_DIR}/conf/topics', 'c') as db:
if topic_name.encode() in db.keys(): if topic_name.encode() in db.keys():
raise exceptions.TopicExistsError(f'topic \'{topic_name}\' already exists') raise exceptions.TopicExistsError(f'topic \'{topic_name}\' already exists')
db[topic_name] = pickle.dumps({'partitions': {1}}) db[topic_name] = pickle.dumps({'partitions': {1}})
db.close() db.close()
os.mkdir(f'data/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}01/') os.mkdir(f'{DATA_DIR}/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}01/')
def rmtopic(topic_name: str): def rmtopic(topic_name: str):
''' '''
@@ -98,7 +99,7 @@ def rmtopic(topic_name: str):
:raises TopicNotFoundError: If the topic does not exist. :raises TopicNotFoundError: If the topic does not exist.
''' '''
with dbm.open('data/conf/topics', 'c') as db: with dbm.open(f'{DATA_DIR}/conf/topics', 'c') as db:
if topic_name.encode() not in db.keys(): if topic_name.encode() not in db.keys():
raise exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist') raise exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist')
partitions = pickle.loads(db[topic_name.encode()])['partitions'] partitions = pickle.loads(db[topic_name.encode()])['partitions']
@@ -108,7 +109,7 @@ def rmtopic(topic_name: str):
partition_dirs = format_topic_partitions(topic_name, partitions) partition_dirs = format_topic_partitions(topic_name, partitions)
for pd in partition_dirs: for pd in partition_dirs:
try: try:
shutil.rmtree(f'data/topics/{pd}/') shutil.rmtree(f'{DATA_DIR}/topics/{pd}/')
except: except:
pass pass
@@ -124,7 +125,7 @@ def mktopicpart(topic_name: str, partition_number: int):
if partition_number < 1 or partition_number > 99: if partition_number < 1 or partition_number > 99:
raise ValueError('partition number must be between 1 and 99') raise ValueError('partition number must be between 1 and 99')
formatted_partition_number = f'{(int(partition_number)):02d}' formatted_partition_number = f'{(int(partition_number)):02d}'
with dbm.open('data/conf/topics', 'c') as db: with dbm.open(f'{DATA_DIR}/conf/topics', 'c') as db:
if topic_name.encode() not in db.keys(): if topic_name.encode() not in db.keys():
raise exceptions.exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist') raise exceptions.exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist')
partitions = pickle.loads(db[topic_name.encode()])['partitions'] partitions = pickle.loads(db[topic_name.encode()])['partitions']
@@ -133,7 +134,7 @@ def mktopicpart(topic_name: str, partition_number: int):
db[topic_name] = pickle.dumps({'partitions': partitions | {int(formatted_partition_number)}}) db[topic_name] = pickle.dumps({'partitions': partitions | {int(formatted_partition_number)}})
db.close() db.close()
os.mkdir(f'data/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/') os.mkdir(f'{DATA_DIR}/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/')
def rmtopicpart(topic_name: str, partition_number: int): def rmtopicpart(topic_name: str, partition_number: int):
''' '''
@@ -145,7 +146,7 @@ def rmtopicpart(topic_name: str, partition_number: int):
:type partition_number: int :type partition_number: int
''' '''
formatted_partition_number = f'{(int(partition_number)):02d}' formatted_partition_number = f'{(int(partition_number)):02d}'
with dbm.open('data/conf/topics', 'c') as db: with dbm.open(f'{DATA_DIR}/conf/topics', 'c') as db:
if topic_name.encode() not in db.keys(): if topic_name.encode() not in db.keys():
raise exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist') raise exceptions.TopicNotFoundError(f'topic \'{topic_name}\' does not exist')
partitions = pickle.loads(db[topic_name.encode()])['partitions'] partitions = pickle.loads(db[topic_name.encode()])['partitions']
@@ -154,7 +155,7 @@ def rmtopicpart(topic_name: str, partition_number: int):
db[topic_name] = pickle.dumps({'partitions': partitions - {int(formatted_partition_number)}}) db[topic_name] = pickle.dumps({'partitions': partitions - {int(formatted_partition_number)}})
db.close() db.close()
shutil.rmtree(f'data/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/') shutil.rmtree(f'{DATA_DIR}/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/')
def get_topics(): def get_topics():
''' '''
@@ -163,7 +164,7 @@ def get_topics():
:return: The topics :return: The topics
:rtype: dict :rtype: dict
''' '''
with dbm.open('data/conf/topics', 'c') as db: with dbm.open(f'{DATA_DIR}/conf/topics', 'c') as db:
topics = {k.decode(): pickle.loads(v) for k, v in db.items()} topics = {k.decode(): pickle.loads(v) for k, v in db.items()}
db.close() db.close()
return topics return topics
@@ -172,9 +173,9 @@ def _create_base_segment_files(topics: dict):
for topic, v in topics.items(): for topic, v in topics.items():
partitions = format_topic_partitions(topic, v['partitions']) partitions = format_topic_partitions(topic, v['partitions'])
for p in partitions: for p in partitions:
utils.create_file_if_not_exists(f'data/topics/{p}/' + '0' * 16 + '.log') utils.create_file_if_not_exists(f'{DATA_DIR}/topics/{p}/' + '0' * 16 + '.log')
utils.create_file_if_not_exists(f'data/topics/{p}/' + '0' * 16 + '.index') utils.create_file_if_not_exists(f'{DATA_DIR}/topics/{p}/' + '0' * 16 + '.index')
utils.create_file_if_not_exists(f'data/topics/{p}/' + '0' * 16 + '.timeindex') utils.create_file_if_not_exists(f'{DATA_DIR}/topics/{p}/' + '0' * 16 + '.timeindex')
def get_segment_base_timestamp(topic: str, segment: int) -> int: def get_segment_base_timestamp(topic: str, segment: int) -> int:
''' '''
@@ -188,7 +189,7 @@ def get_segment_base_timestamp(topic: str, segment: int) -> int:
:return: The base timestamp (milliseconds since 1970) :return: The base timestamp (milliseconds since 1970)
:rtype: int :rtype: int
''' '''
with open(f'data/topics/{topic}/{segment:016x}.log', 'rb') as logfile: with open(f'{DATA_DIR}/topics/{topic}/{segment:016x}.log', 'rb') as logfile:
# go to the topic length # go to the topic length
logfile.seek(20) logfile.seek(20)
@@ -207,12 +208,12 @@ def get_segment_base_timestamp(topic: str, segment: int) -> int:
def get_log_end_offsets(topics: dict): def get_log_end_offsets(topics: dict):
_topics = topics _topics = topics
topics = [f'{t}{TOPIC_PARTITION_SEPARATOR}{p:02d}' for t, v in _topics.items() for p in v['partitions']] topics = [f'{t}{TOPIC_PARTITION_SEPARATOR}{p:02d}' for t, v in _topics.items() for p in v['partitions']]
last_topics_segments = {t: sorted([s for s in os.listdir(f'data/topics/{t}') if s.endswith('.log') and utils.is_number(s.split('.')[0], 16) and len(s.split('.')[0]) == 16])[-1] for t in topics} last_topics_segments = {t: sorted([s for s in os.listdir(f'{DATA_DIR}/topics/{t}') if s.endswith('.log') and utils.is_number(s.split('.')[0], 16) and len(s.split('.')[0]) == 16])[-1] for t in topics}
topic_segments = {t: Topic(t).segments for t in topics} topic_segments = {t: Topic(t).segments for t in topics}
for topic, segments in topic_segments.items(): for topic, segments in topic_segments.items():
log_end_offset = 0 log_end_offset = 0
for segment in segments: for segment in segments:
with open(f'data/topics/{topic}/{segment.base_offset:016x}.log', 'rb') as logfile: with open(f'{DATA_DIR}/topics/{topic}/{segment.base_offset:016x}.log', 'rb') as logfile:
logfile.seek(0, os.SEEK_END) logfile.seek(0, os.SEEK_END)
file_size = logfile.tell() file_size = logfile.tell()
logfile.seek(0) logfile.seek(0)
@@ -229,18 +230,19 @@ def get_log_end_offsets(topics: dict):
logfile.close() logfile.close()
log_end_offsets[topic] = log_end_offset log_end_offsets[topic] = log_end_offset
def init(config: dict = {'segment_max_size': 1024 * 256}): def init(config: dict = {'segment_max_size': 1024 * 256, 'data_dir': './data'}):
''' '''
Initializes the module. Calling this function is required before using most of the functions of the module. Initializes the module. Calling this function is required before using most of the functions of the module.
:param config: The configuration :param config: The configuration
:type config: dict :type config: dict
''' '''
global SEGMENT_MAX_SIZE global SEGMENT_MAX_SIZE, DATA_DIR
SEGMENT_MAX_SIZE = config.get('segment_max_size', SEGMENT_MAX_SIZE) SEGMENT_MAX_SIZE = config.get('segment_max_size', SEGMENT_MAX_SIZE)
DATA_DIR = config.get('data_dir', DATA_DIR)
topics = get_topics() topics = get_topics()
# segments = {t: sorted([int(s.split('.')[0], 16) for s in os.listdir(f'data/topics/{t}') if s.endswith('.log') and utils.is_number(s.split('.')[0], 16) and len(s.split('.')[0]) == 16]) for t in [f'{t}{TOPIC_PARTITION_SEPARATOR}{p:02d}' for t, v in topics.items() for p in v['partitions']]} # segments = {t: sorted([int(s.split('.')[0], 16) for s in os.listdir(f'{DATA_DIR}/topics/{t}') if s.endswith('.log') and utils.is_number(s.split('.')[0], 16) and len(s.split('.')[0]) == 16]) for t in [f'{t}{TOPIC_PARTITION_SEPARATOR}{p:02d}' for t, v in topics.items() for p in v['partitions']]}
# base_timestamps = {t: {s: Segment(t, s).base_timestamp for s in ss} for t, ss in segments.items()} # base_timestamps = {t: {s: Segment(t, s).base_timestamp for s in ss} for t, ss in segments.items()}
@@ -399,7 +401,7 @@ def create_record(topic: str, timestamp: int, record_data: bytes, key: bytes = b
check_topic_exists(topic) check_topic_exists(topic)
topic_dir = f'data/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/' topic_dir = f'{DATA_DIR}/topics/{topic_name}{TOPIC_PARTITION_SEPARATOR}{formatted_partition_number}/'
utils.create_file_if_not_exists(topic_dir + '0' * 16 + '.log') utils.create_file_if_not_exists(topic_dir + '0' * 16 + '.log')
segments = sorted([int(f.split('.')[0], 16) for f in os.listdir(topic_dir) if f.endswith('.log') and utils.is_number(f.split('.')[0], 16) and len(f.split('.')[0]) == 16]) segments = sorted([int(f.split('.')[0], 16) for f in os.listdir(topic_dir) if f.endswith('.log') and utils.is_number(f.split('.')[0], 16) and len(f.split('.')[0]) == 16])
last_segment = max(segments) last_segment = max(segments)
@@ -478,12 +480,12 @@ async def fetch_records(topic: str, start_type: int, start: int, max_bytes: int,
if start_type not in (FETCH_START_TYPE_OFFSET, FETCH_START_TYPE_TIMESTAMP): if start_type not in (FETCH_START_TYPE_OFFSET, FETCH_START_TYPE_TIMESTAMP):
raise ValueError('invalid start type') raise ValueError('invalid start type')
check_topic_exists(topic.topic_name) check_topic_exists(topic.topic_name)
segments = [int(f.split('.')[0], 16) for f in os.listdir(f'data/topics/{topic.topic_name}/') if utils.is_number(f.split('.')[0], 16) and len(f.split('.')[0]) == 16 and f.endswith('.log')] segments = [int(f.split('.')[0], 16) for f in os.listdir(f'{DATA_DIR}/topics/{topic.topic_name}/') if utils.is_number(f.split('.')[0], 16) and len(f.split('.')[0]) == 16 and f.endswith('.log')]
if start_type == FETCH_START_TYPE_TIMESTAMP: if start_type == FETCH_START_TYPE_TIMESTAMP:
segment_base_timestamps = {s: s.base_timestamp for s in topic.segments} segment_base_timestamps = {s: s.base_timestamp for s in topic.segments}
s = {v: k for k, v in segment_base_timestamps.items()}[utils.find_nearest_lower_number(list(segment_base_timestamps.values()), start) or min(segment_base_timestamps.values())] s = {v: k for k, v in segment_base_timestamps.items()}[utils.find_nearest_lower_number(list(segment_base_timestamps.values()), start) or min(segment_base_timestamps.values())]
with open(f'data/topics/{topic.topic_name}/{s.base_offset:016x}.timeindex', 'rb') as f: with open(f'{DATA_DIR}/topics/{topic.topic_name}/{s.base_offset:016x}.timeindex', 'rb') as f:
timestamps = {} timestamps = {}
f.seek(0) f.seek(0)
while True: while True:
@@ -511,7 +513,7 @@ async def fetch_records(topic: str, start_type: int, start: int, max_bytes: int,
start_seg = utils.find_nearest_lower_number(segments, start) start_seg = utils.find_nearest_lower_number(segments, start)
with open(f'data/topics/{topic.topic_name}/' + f'{start_seg:016x}' + '.index', 'rb') as idxfile: with open(f'{DATA_DIR}/topics/{topic.topic_name}/' + f'{start_seg:016x}' + '.index', 'rb') as idxfile:
idxfile.seek(0, os.SEEK_END) idxfile.seek(0, os.SEEK_END)
file_size = idxfile.tell() file_size = idxfile.tell()
idxfile.seek(idx_offset // 1024 * 12) idxfile.seek(idx_offset // 1024 * 12)
@@ -521,7 +523,7 @@ async def fetch_records(topic: str, start_type: int, start: int, max_bytes: int,
warnings.warn(f'Index entry offset mismatch: expected {idx_offset}, got {int.from_bytes(index_entry[:8])}', FileCorruptWarning) warnings.warn(f'Index entry offset mismatch: expected {idx_offset}, got {int.from_bytes(index_entry[:8])}', FileCorruptWarning)
idxfile.close() idxfile.close()
with open(f'data/topics/{topic.topic_name}/{start_seg:016x}.log', 'rb') as logfile: with open(f'{DATA_DIR}/topics/{topic.topic_name}/{start_seg:016x}.log', 'rb') as logfile:
logfile.seek(idx_entry_pos) logfile.seek(idx_entry_pos)
offset = idx_offset offset = idx_offset
record_position = 0 record_position = 0
@@ -550,7 +552,7 @@ async def fetch_records(topic: str, start_type: int, start: int, max_bytes: int,
print(max_bytes) print(max_bytes)
for i, segment in enumerate(segments[segments.index(start_seg):]): for i, segment in enumerate(segments[segments.index(start_seg):]):
records_sent = 0 records_sent = 0
with open(f'data/topics/{topic.topic_name}/' + f'{segment:016x}' + '.log', 'rb') as logfile: with open(f'{DATA_DIR}/topics/{topic.topic_name}/' + f'{segment:016x}' + '.log', 'rb') as logfile:
if i == 0: if i == 0:
logfile.seek(record_position) logfile.seek(record_position)
else: else:
@@ -597,7 +599,7 @@ class Topic:
@property @property
def segments(self): def segments(self):
return [Segment(self.topic_name, bo) for bo in sorted([int(s.split('.')[0], 16) for s in os.listdir(f'data/topics/{self.topic_name}') if s.endswith('.log') and utils.is_number(s.split('.')[0], 16) and len(s.split('.')[0]) == 16])] return [Segment(self.topic_name, bo) for bo in sorted([int(s.split('.')[0], 16) for s in os.listdir(f'{DATA_DIR}/topics/{self.topic_name}') if s.endswith('.log') and utils.is_number(s.split('.')[0], 16) and len(s.split('.')[0]) == 16])]
class Segment: class Segment:
def __init__(self, topic: str, base_offset: int): def __init__(self, topic: str, base_offset: int):
@@ -609,7 +611,7 @@ class Segment:
return f'Segment({self.topic}, {self.base_offset})' return f'Segment({self.topic}, {self.base_offset})'
def get_base_timestamp(self): def get_base_timestamp(self):
with open(f'data/topics/{self.topic}/{self.base_offset:016x}.timeindex', 'rb') as logfile: with open(f'{DATA_DIR}/topics/{self.topic}/{self.base_offset:016x}.timeindex', 'rb') as logfile:
timestamp_bytes = logfile.read(8) timestamp_bytes = logfile.read(8)
if len(timestamp_bytes) < 8: if len(timestamp_bytes) < 8:
timestamp = None timestamp = None