geändert: README.md

geändert:       client/client.py
	gelöscht:       lib/__pycache__/crypto_utils.cpython-313.pyc
	gelöscht:       lib/__pycache__/jebp_utils.cpython-313.pyc
	gelöscht:       lib/__pycache__/terminal_table.cpython-313.pyc
	geändert:       lib/jebp_utils.py
	gelöscht:       server/clients_management/chclient.py
	gelöscht:       server/clients_management/rmclient.py
	neue Datei:     server/data/conf/client_admin_rights
	umbenannt:      server/config/clients/fingerprints -> server/data/conf/client_fingerprints
	neue Datei:     server/data/conf/topics
	geändert:       server/server.py
	neue Datei:     server/utils/clients_management/chclient.py
	neue Datei:     server/utils/clients_management/lsclients.py
	umbenannt:      server/clients_management/mkclient.py -> server/utils/clients_management/mkclient.py
	neue Datei:     server/utils/clients_management/rmclient.py
	umbenannt:      server/clients_management/lsclients.py -> server/utils/topics_management/lstopics.py
This commit is contained in:
2026-01-11 12:54:26 +01:00
parent bfec87dde6
commit b1ac351ad1
17 changed files with 341 additions and 73 deletions
+167 -28
View File
@@ -1,28 +1,167 @@
# jeb # jeb
jCloud Event Bus jCloud Event Bus
## jebp (jCloud Event Bus Protocol) ## jebp (jCloud Event Bus Protocol)
### Messages ### Messages
All messages begin with `0x01`, followed by the content length length. The content length length is the length of the content length and representated by one byte. Then follows the content length and after that the message content. All messages begin with `0x01`, followed by the content length length. The content length length is the length of the content length and representated by one byte. Then follows the content length and after that the message content.
### Handshake ### Handshake
1. Server: Server protocol, version 1. Server: Server protocol, version
2. Encryption 2. Encryption
1. Client: Serialized client public key 1. Client: Serialized client public key
2. Server: Serialized server public key 2. Server: Serialized server public key
3. Client: Client nonce 3. Client: Client nonce
4. Server: Server nonce 4. Server: Server nonce
5. Client: Random bytes (encrypted) 5. Client: Random bytes (encrypted)
6. Server: received bytes from client (not encrypted) 6. Server: received bytes from client (not encrypted)
The client closes the connection if the received bytes from server (7.) do not match the generated bytes (6.) to prevent malfunction and test the encryption. The client closes the connection if the received bytes from server (7.) do not match the generated bytes (6.) to prevent malfunction and test the encryption.
3. Server authentication 3. Server authentication
1. Server: Server certificate 1. Server: Server certificate
The client closes the connection if it does not trust the server certificate. The client closes the connection if it does not trust the server certificate.
4. Client authentication 4. Client authentication
1. Client: Client certificate 1. Client: Client certificate
The server closes the connection if the client is unauthorized. The server closes the connection if the client is unauthorized.
### Commands
#### `0x11`: Create topic
##### Request
`0x11<TOPIC_NAME>`, parameters:
- `<TOPIC_NAME>`: The name of the topic
##### Response
###### Status code
- `0xa1`: Topic successfully created
- `0xb4`: Unknown error creating the topic
- `0xb6`: Topic exists
- `0xb7`: No administrative rights
###### Content
Empty
Requires administrative rights.
#### `0x12`: Create record
##### Request
`0x12<TOPIC_NAME><CHECKSUM><TIMESTAMP><CONTENT>`, parameters:
- `<TOPIC_NAME>`: The name of the topic
- `<CHECKSUM>`: The CRC
- `<TIMESTAMP>`: The timestamp of the record (the milliseconds since the epoch, formatted as `int64`). If it is `0`, the timestamp the record was is at will be saved.
- `<CONTENT>`: The record content
##### Response
###### Status code
- `0xa1`: Record successfully created
- `0xb3`: Unknown error creating the record. Probably the CRC is invalid if the repetition byte is `0x31`
###### Content
Empty
#### `0x21`: Subscribe
##### Request
`0x21<TOPICS>`, parameters:
- `<TOPICS>`: The topics, comma-separated
##### Response
###### Status code
- `0xa2`: Successfully subscribed
- `0xb0`: Unknown error subscribing
- `0xb1`: At least one of the topics does not exist
###### Content
- Status code `0xa0`: Empty
- Status code `0xb1`: `<TOPICS>`, `<TOPICS>`: The topics that do not exist
#### `0x22`: Fetch records
##### Request
`0x22<START_TYPE><START><MAX_BYTES>`
- `<START_TYPE>`: One byte, if `0xc0`, the `<START>` is the timestamp of the first fetched event, if `0xc1` the offset.
- `<START>`: The first event timestamp or offset
- `<MAX_BYTES>`: Maximum bytes sent
##### Response
###### Status code
- `0xa0`: Success
- `0xb1`: The offset `<OFFSET>` does not exist
#### `0x31`: Remove topic
##### Request
`0x31<TOPIC_NAME>`, parameters:
- `<TOPIC_NAME>`: The name of the topic
##### Response
###### Status code
- `0xa2`: Topic successfully removed
- `0xb8`: Unknown deletion error
- `0xb9`: Unknown deletion error. The client should attempt to perform the operation again.
###### Content
Empty
Requires administrative rights.
### Status codes
The status code consists of two bytes.
1\. Byte (status byte): The status
2\. Byte (repetition byte): If the operation was unsuccessful, `0x31` if the client should try to perform the operation again, otherwise or if the operation was successful, `0x30`.
In this documentation, the *status code* always means the status byte, unless otherwise stated.
#### `0xa…`: Success
- `0xa0`: Reading successful
- `0xa1`: Creation successful
- `0xa2`: Deletion successful
#### `0xb…`: Error
- `0xb0`: Reading error
- `0xb1`: Reading error, object does not exist
- `0xb2`: Reading error, no permission
- `0xb3`: Creation error
- `0xb4`: Creation error, object already exists
- `0xb5`: Creation error, no permission
- `0xb6`: Deletion error
- `0xb7`: Deletion error, object does not exist
- `0xb8`: Deletion error, no permission
## Data storage
The topics consist of partitions and every partition consists of segments. A segment consists of three files,
- `<BASE_OFFSET>.log`
- `<BASE_OFFSET>.index`
- `<BASE_OFFSET>.timeindex`
`<BASE_OFFSET>`: The offset of the first record in the segment
In the `<BASE_OFFSET>.log` file, the records are saved.
In the `<BASE_OFFSET>.index` file, the positions (bytes) of every 1024th record are saved.
In the `<BASE_OFFSET>.timeindex` file, the timestampts of every 1024th record are saved.
### Data format
#### `<BASE_OFFSET>.log`
##### Record
| Field | Type | Length (bytes) | Description |
| - | - | - | - |
| **Offset** | `int64` | 8 | Offset |
| **Record Length** | `int32` | 4 | Length |
| **CRC** | `int32` | 4 | Checksum |
| **Attributes** | `byte` | 1 | Flags, e. g. Compression |
| **Timestamp** | `int64` | 8 | Milliseconds since Epoch |
| **Key Length** | `int32` | 4 | The length of the key |
| **Key** | `bytes` | -- | optional |
| **Payload length** | `int32` | 4 | The length of the payload |
| **Payload** | `bytes` | -- | Record payload |
| **Headers Count** | `int8` | 1 | Number of header pairs |
| **Header Key / Value** | `int32` | -- | Header pairs |
##### Header pairs
| Field | Type | Length (bytes) | Description |
| - | - | - | - |
| Header key length | `int8` | 4 | Length of the header name (key) |
| Header key | `bytes` | -- | Header name (key) |
| Header value length | `int8` | 4 | Length of the header value |
| Header value | `bytes` | -- | Header value |
+8
View File
@@ -81,6 +81,14 @@ async def main():
certfile.close() certfile.close()
await sendmsg(base64.b64decode(cert_data.replace(b'-----BEGIN CERTIFICATE-----', b'').replace(b'-----END CERTIFICATE-----', b'').strip()), writer, aesgcm, client_nonce) await sendmsg(base64.b64decode(cert_data.replace(b'-----BEGIN CERTIFICATE-----', b'').replace(b'-----END CERTIFICATE-----', b'').strip()), writer, aesgcm, client_nonce)
msg = await readmsg(reader, aesgcm, server_nonce)
if msg == b'SCD':
print('LOGIN SUCCEEDED')
elif msg == b'ERR':
print('LOGIN FAILED')
await sendmsg(b'\x11', writer, aesgcm, client_nonce)
print(await readmsg(reader, aesgcm, server_nonce))
except MessageFormatError: except MessageFormatError:
print('invalid message format') print('invalid message format')
Binary file not shown.
Binary file not shown.
Binary file not shown.
+1 -1
View File
@@ -13,7 +13,7 @@ def int_to_bytes(n: int, signed = False):
return n.to_bytes((n.bit_length() + 7) // 8, signed = signed) return n.to_bytes((n.bit_length() + 7) // 8, signed = signed)
async def sendmsg(m, writer: asyncio.StreamWriter, aesgcm: AESGCM = None, aesnonce = None, start_byte = MSG_START_BYTE): async def sendmsg(m, writer: asyncio.StreamWriter, aesgcm: AESGCM = None, aesnonce = None, start_byte = MSG_START_BYTE):
if type(m) == str: if type(m) == str:
m = m.encode() m = m.encode()
if aesgcm: if aesgcm:
m = aesgcm.encrypt(aesnonce, m, None) m = aesgcm.encrypt(aesnonce, m, None)
-19
View File
@@ -1,19 +0,0 @@
#!/usr/bin/env python3
import dbm
import sys
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import hashlib
if len(sys.argv) < 2:
print(f'{sys.argv[0]}: missing key hash')
sys.exit(1)
with dbm.open('server/config/clients/fingerprints', 'c') as db:
if bytes.fromhex(sys.argv[1]) not in db.keys():
print(f'{sys.argv[0]}: hash not registered')
sys.exit(1)
db[bytes.fromhex(sys.argv[1])] = input('New common name: ')
db.close()
-19
View File
@@ -1,19 +0,0 @@
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.getcwd())
import dbm
from lib.terminal_table import ascii_table
if len(sys.argv) < 2:
print(f'{sys.argv[0]}: missing key hash')
sys.exit(1)
with dbm.open('server/config/clients/fingerprints', 'c') as db:
try:
del db[bytes.fromhex(sys.argv[1])]
except:
print(f'{sys.argv[0]}: hash not registered')
sys.exit(1)
db.close()
+1
View File
@@ -0,0 +1 @@
947586a2a725bb7568fe221c9e8b443056df6213979a04ce2f38b82eeb2bd5a6
Binary file not shown.
+41 -2
View File
@@ -26,6 +26,34 @@ VERSION = 'jebp 1.0'
CERT_FILE = 'server/sec/server.crt.pem' CERT_FILE = 'server/sec/server.crt.pem'
CLIENT_CERT_ISSUER_NAME = 'jCloudCA-Root-CA' CLIENT_CERT_ISSUER_NAME = 'jCloudCA-Root-CA'
class Client:
def __init__(self, fingerprint: bytes, common_name: str, admin: bool = False):
self.fingerprint = fingerprint
self.common_name = common_name
self.admin = admin
def get_client_info(client_fingerprint: bytes):
with dbm.open('server/data/conf/client_fingerprints') as db:
if client_fingerprint not in db.keys():
raise KeyError('Client not registered')
common_name = db[client_fingerprint].decode()
db.close()
with open('server/data/conf/client_admin_rights') as carf:
admins = carf.read().split('\n')
carf.close()
return Client(fingerprint = client_fingerprint, common_name = common_name, admin = client_fingerprint.hex() in admins)
def process_command(command, client: Client):
print(client.fingerprint, client.common_name, client.admin)
response = b''
if command[0] == 0x11:
if not client.admin:
response = b'\xb7'
return response
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info('peername') addr = writer.get_extra_info('peername')
print(f'Connected to {addr}') print(f'Connected to {addr}')
@@ -68,16 +96,27 @@ async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWrit
cert = x509.load_der_x509_certificate(await readmsg(reader, aesgcm, client_nonce)) cert = x509.load_der_x509_certificate(await readmsg(reader, aesgcm, client_nonce))
key_hash = hashlib.sha256(cert.public_key().public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo)).digest() key_hash = hashlib.sha256(cert.public_key().public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo)).digest()
with dbm.open('server/config/clients/fingerprints', 'c') as db: with dbm.open('server/data/conf/client_fingerprints', 'c') as db:
if key_hash not in db.keys(): if key_hash not in db.keys():
await sendmsg('ERR', writer, aesgcm, server_nonce)
raise InvalidCertificateError('client not known') raise InvalidCertificateError('client not known')
if not validate_cert(cert, db[key_hash].decode(), CLIENT_CERT_ISSUER_NAME): if not validate_cert(cert, db[key_hash].decode(), CLIENT_CERT_ISSUER_NAME):
await sendmsg('ERR', writer, aesgcm, server_nonce)
raise InvalidCertificateError('client certificate not trusted') raise InvalidCertificateError('client certificate not trusted')
print('Client authenticated') await sendmsg('SCD', writer, aesgcm, server_nonce)
print('CLIENT AUTHENTICATED')
while True:
try:
await sendmsg(process_command(await readmsg(reader, aesgcm, client_nonce), get_client_info(hashlib.sha256(cert.public_key().public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo)).digest())), writer, aesgcm, server_nonce)
except Exception as e:
print(f'{str(type(e))[8:-2]}: {e}')
break
writer.close() writer.close()
await writer.wait_closed() await writer.wait_closed()
print(f'Connection to {addr} closed')
async def main(): async def main():
+56
View File
@@ -0,0 +1,56 @@
#!/usr/bin/env python3
import dbm
import sys
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import hashlib
import os
if len(sys.argv) < 2:
print(f'{sys.argv[0]}: missing key hash')
sys.exit(1)
with dbm.open('server/data/conf/client_fingerprints', 'c') as db:
if bytes.fromhex(sys.argv[1]) not in db.keys():
print(f'{sys.argv[0]}: hash not registered')
sys.exit(1)
db[bytes.fromhex(sys.argv[1])] = input(f'New common name [{db[bytes.fromhex(sys.argv[1])].decode()}]: ') or db[bytes.fromhex(sys.argv[1])]
db.close()
if os.path.exists('server/data/conf/locks/client_admin_rights.lock'):
with open('server/data/conf/locks/client_admin_rights.lock', 'r') as lockf:
print(f'{sys.argv[0]}: admin rights file is locked by process {lockf.read()}')
lockf.close()
sys.exit(1)
try:
with open('server/data/conf/locks/client_admin_rights.lock', 'w') as lockf:
lockf.write(str(os.getpid()))
lockf.close()
with open('server/data/conf/client_admin_rights', 'r') as carf:
admins = {kh for kh in carf.read().split('\n') if kh}
carf.close()
admin = input(f'Is admin (y/n) [{'y' if sys.argv[1] in admins else 'n'}]: ') or ('y' if sys.argv[1] in admins else 'n')
if admin == 'y':
admin = True
else:
admin = False
if admin:
if sys.argv[1] not in admins:
print(f'Added {sys.argv[1]} to the admins')
admins.add(sys.argv[1])
else:
try:
admins.remove(sys.argv[1])
except KeyError:
pass
with open('server/data/conf/client_admin_rights', 'w') as carf:
carf.write('\n'.join(admins))
carf.close()
finally:
os.remove('server/data/conf/locks/client_admin_rights.lock')
+19
View File
@@ -0,0 +1,19 @@
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.getcwd())
import dbm
from lib.terminal_table import ascii_table
with open('server/data/conf/client_admin_rights', 'r') as carf:
admins = {kh for kh in carf.read().split('\n') if kh}
carf.close()
with dbm.open('server/data/conf/client_fingerprints', 'c') as db:
print(ascii_table([{
'Key Hash': k.hex(),
'Common name': v.decode(),
'Is admin': 'yes' if k.hex() in admins else 'no'
} for k, v in db.items()], ))
db.close()
@@ -17,6 +17,6 @@ except:
print(f'{sys.argv[0]}: invalid certificate') print(f'{sys.argv[0]}: invalid certificate')
sys.exit(1) sys.exit(1)
with dbm.open('server/config/clients/fingerprints', 'c') as db: with dbm.open('server/data/conf/client_fingerprints', 'c') as db:
db[hashlib.sha256(cert.public_key().public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo)).digest()] = sys.argv[1] db[hashlib.sha256(cert.public_key().public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo)).digest()] = sys.argv[1]
db.close() db.close()
+43
View File
@@ -0,0 +1,43 @@
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.getcwd())
import dbm
from lib.terminal_table import ascii_table
if len(sys.argv) < 2:
print(f'{sys.argv[0]}: missing key hash')
sys.exit(1)
with dbm.open('server/data/conf/client_fingerprints', 'c') as db:
try:
del db[bytes.fromhex(sys.argv[1])]
except:
print(f'{sys.argv[0]}: hash not registered')
sys.exit(1)
db.close()
if os.path.exists('server/data/conf/locks/client_admin_rights.lock'):
with open('server/data/conf/locks/client_admin_rights.lock', 'r') as lockf:
print(f'{sys.argv[0]}: admin rights file is locked by process {lockf.read()}')
lockf.close()
sys.exit(1)
try:
with open('server/data/conf/locks/client_admin_rights.lock', 'w') as lockf:
lockf.write(str(os.getpid()))
lockf.close()
with open('server/data/conf/client_admin_rights', 'r') as carf:
admins = {kh for kh in carf.read().split('\n') if kh}
carf.close()
if sys.argv[1] in admins:
admins.remove(sys.argv[1])
with open('server/data/conf/client_admin_rights', 'w') as carf:
carf.write('\n'.join(admins))
carf.close()
finally:
os.remove('server/data/conf/locks/client_admin_rights.lock')
@@ -5,10 +5,11 @@ import os
sys.path.append(os.getcwd()) sys.path.append(os.getcwd())
import dbm import dbm
from lib.terminal_table import ascii_table from lib.terminal_table import ascii_table
import pickle
with dbm.open('server/config/clients/fingerprints', 'c') as db: with dbm.open('server/data/conf/topics', 'c') as db:
print(ascii_table([{ print(ascii_table([{
'Key Hash': k.hex(), 'Name': k.decode(),
'Common name': v.decode() 'Partitions': str(pickle.loads(v['partitions'])),
} for k, v in db.items()], )) } for k, v in db.items()], ))
db.close() db.close()