diff --git a/README.md b/README.md index fcc0d70..9e39e06 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,167 @@ -# jeb - -jCloud Event Bus - -## jebp (jCloud Event Bus Protocol) - -### Messages -All messages begin with `0x01`, followed by the content length length. The content length length is the length of the content length and representated by one byte. Then follows the content length and after that the message content. - -### Handshake -1. Server: Server protocol, version -2. Encryption - 1. Client: Serialized client public key - 2. Server: Serialized server public key - 3. Client: Client nonce - 4. Server: Server nonce - 5. Client: Random bytes (encrypted) - 6. Server: received bytes from client (not encrypted) - - The client closes the connection if the received bytes from server (7.) do not match the generated bytes (6.) to prevent malfunction and test the encryption. -3. Server authentication - 1. Server: Server certificate - - The client closes the connection if it does not trust the server certificate. -4. Client authentication - 1. Client: Client certificate - - The server closes the connection if the client is unauthorized. \ No newline at end of file +# jeb + +jCloud Event Bus + +## jebp (jCloud Event Bus Protocol) + +### Messages +All messages begin with `0x01`, followed by the content length length. The content length length is the length of the content length and representated by one byte. Then follows the content length and after that the message content. + +### Handshake +1. Server: Server protocol, version +2. Encryption + 1. Client: Serialized client public key + 2. Server: Serialized server public key + 3. Client: Client nonce + 4. Server: Server nonce + 5. Client: Random bytes (encrypted) + 6. Server: received bytes from client (not encrypted) + + The client closes the connection if the received bytes from server (7.) do not match the generated bytes (6.) to prevent malfunction and test the encryption. +3. Server authentication + 1. Server: Server certificate + + The client closes the connection if it does not trust the server certificate. +4. Client authentication + 1. Client: Client certificate + + The server closes the connection if the client is unauthorized. + + +### Commands +#### `0x11`: Create topic +##### Request +`0x11`, parameters: +- ``: The name of the topic +##### Response +###### Status code +- `0xa1`: Topic successfully created +- `0xb4`: Unknown error creating the topic +- `0xb6`: Topic exists +- `0xb7`: No administrative rights +###### Content +Empty + +Requires administrative rights. + + +#### `0x12`: Create record +##### Request +`0x12`, parameters: +- ``: The name of the topic +- ``: The CRC +- ``: The timestamp of the record (the milliseconds since the epoch, formatted as `int64`). If it is `0`, the timestamp the record was is at will be saved. +- ``: The record content +##### Response +###### Status code +- `0xa1`: Record successfully created +- `0xb3`: Unknown error creating the record. Probably the CRC is invalid if the repetition byte is `0x31` +###### Content +Empty + + +#### `0x21`: Subscribe +##### Request +`0x21`, parameters: +- ``: The topics, comma-separated +##### Response +###### Status code +- `0xa2`: Successfully subscribed +- `0xb0`: Unknown error subscribing +- `0xb1`: At least one of the topics does not exist +###### Content +- Status code `0xa0`: Empty +- Status code `0xb1`: ``, ``: The topics that do not exist + + +#### `0x22`: Fetch records +##### Request +`0x22` +- ``: One byte, if `0xc0`, the `` is the timestamp of the first fetched event, if `0xc1` the offset. +- ``: The first event timestamp or offset +- ``: Maximum bytes sent +##### Response +###### Status code +- `0xa0`: Success +- `0xb1`: The offset `` does not exist + +#### `0x31`: Remove topic +##### Request +`0x31`, parameters: +- ``: The name of the topic +##### Response +###### Status code +- `0xa2`: Topic successfully removed +- `0xb8`: Unknown deletion error +- `0xb9`: Unknown deletion error. The client should attempt to perform the operation again. +###### Content +Empty + +Requires administrative rights. + + +### Status codes +The status code consists of two bytes. + +1\. Byte (status byte): The status + +2\. Byte (repetition byte): If the operation was unsuccessful, `0x31` if the client should try to perform the operation again, otherwise or if the operation was successful, `0x30`. + +In this documentation, the *status code* always means the status byte, unless otherwise stated. + +#### `0xa…`: Success +- `0xa0`: Reading successful +- `0xa1`: Creation successful +- `0xa2`: Deletion successful +#### `0xb…`: Error +- `0xb0`: Reading error +- `0xb1`: Reading error, object does not exist +- `0xb2`: Reading error, no permission +- `0xb3`: Creation error +- `0xb4`: Creation error, object already exists +- `0xb5`: Creation error, no permission +- `0xb6`: Deletion error +- `0xb7`: Deletion error, object does not exist +- `0xb8`: Deletion error, no permission + + + + +## Data storage +The topics consist of partitions and every partition consists of segments. A segment consists of three files, +- `.log` +- `.index` +- `.timeindex` + +``: The offset of the first record in the segment + +In the `.log` file, the records are saved. + +In the `.index` file, the positions (bytes) of every 1024th record are saved. + +In the `.timeindex` file, the timestampts of every 1024th record are saved. + +### Data format +#### `.log` +##### Record +| Field | Type | Length (bytes) | Description | +| - | - | - | - | +| **Offset** | `int64` | 8 | Offset | +| **Record Length** | `int32` | 4 | Length | +| **CRC** | `int32` | 4 | Checksum | +| **Attributes** | `byte` | 1 | Flags, e. g. Compression | +| **Timestamp** | `int64` | 8 | Milliseconds since Epoch | +| **Key Length** | `int32` | 4 | The length of the key | +| **Key** | `bytes` | -- | optional | +| **Payload length** | `int32` | 4 | The length of the payload | +| **Payload** | `bytes` | -- | Record payload | +| **Headers Count** | `int8` | 1 | Number of header pairs | +| **Header Key / Value** | `int32` | -- | Header pairs | + +##### Header pairs +| Field | Type | Length (bytes) | Description | +| - | - | - | - | +| Header key length | `int8` | 4 | Length of the header name (key) | +| Header key | `bytes` | -- | Header name (key) | +| Header value length | `int8` | 4 | Length of the header value | +| Header value | `bytes` | -- | Header value | \ No newline at end of file diff --git a/client/client.py b/client/client.py index ebdb73a..b076091 100755 --- a/client/client.py +++ b/client/client.py @@ -81,6 +81,14 @@ async def main(): certfile.close() await sendmsg(base64.b64decode(cert_data.replace(b'-----BEGIN CERTIFICATE-----', b'').replace(b'-----END CERTIFICATE-----', b'').strip()), writer, aesgcm, client_nonce) + msg = await readmsg(reader, aesgcm, server_nonce) + if msg == b'SCD': + print('LOGIN SUCCEEDED') + elif msg == b'ERR': + print('LOGIN FAILED') + + await sendmsg(b'\x11', writer, aesgcm, client_nonce) + print(await readmsg(reader, aesgcm, server_nonce)) except MessageFormatError: print('invalid message format') diff --git a/lib/__pycache__/crypto_utils.cpython-313.pyc b/lib/__pycache__/crypto_utils.cpython-313.pyc deleted file mode 100644 index 621c2a7..0000000 Binary files a/lib/__pycache__/crypto_utils.cpython-313.pyc and /dev/null differ diff --git a/lib/__pycache__/jebp_utils.cpython-313.pyc b/lib/__pycache__/jebp_utils.cpython-313.pyc deleted file mode 100644 index f1cb593..0000000 Binary files a/lib/__pycache__/jebp_utils.cpython-313.pyc and /dev/null differ diff --git a/lib/__pycache__/terminal_table.cpython-313.pyc b/lib/__pycache__/terminal_table.cpython-313.pyc deleted file mode 100644 index 5b4fa01..0000000 Binary files a/lib/__pycache__/terminal_table.cpython-313.pyc and /dev/null differ diff --git a/lib/jebp_utils.py b/lib/jebp_utils.py index ae5146d..9cfc1ba 100644 --- a/lib/jebp_utils.py +++ b/lib/jebp_utils.py @@ -13,7 +13,7 @@ def int_to_bytes(n: int, signed = False): return n.to_bytes((n.bit_length() + 7) // 8, signed = signed) async def sendmsg(m, writer: asyncio.StreamWriter, aesgcm: AESGCM = None, aesnonce = None, start_byte = MSG_START_BYTE): - if type(m) == str: + if type(m) == str: m = m.encode() if aesgcm: m = aesgcm.encrypt(aesnonce, m, None) diff --git a/server/clients_management/chclient.py b/server/clients_management/chclient.py deleted file mode 100755 index 7625b64..0000000 --- a/server/clients_management/chclient.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python3 - -import dbm -import sys -from cryptography import x509 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization -import hashlib - -if len(sys.argv) < 2: - print(f'{sys.argv[0]}: missing key hash') - sys.exit(1) - -with dbm.open('server/config/clients/fingerprints', 'c') as db: - if bytes.fromhex(sys.argv[1]) not in db.keys(): - print(f'{sys.argv[0]}: hash not registered') - sys.exit(1) - db[bytes.fromhex(sys.argv[1])] = input('New common name: ') - db.close() \ No newline at end of file diff --git a/server/clients_management/rmclient.py b/server/clients_management/rmclient.py deleted file mode 100755 index 2f08a2d..0000000 --- a/server/clients_management/rmclient.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python3 - -import sys -import os -sys.path.append(os.getcwd()) -import dbm -from lib.terminal_table import ascii_table - -if len(sys.argv) < 2: - print(f'{sys.argv[0]}: missing key hash') - sys.exit(1) - -with dbm.open('server/config/clients/fingerprints', 'c') as db: - try: - del db[bytes.fromhex(sys.argv[1])] - except: - print(f'{sys.argv[0]}: hash not registered') - sys.exit(1) - db.close() \ No newline at end of file diff --git a/server/data/conf/client_admin_rights b/server/data/conf/client_admin_rights new file mode 100644 index 0000000..aa961ca --- /dev/null +++ b/server/data/conf/client_admin_rights @@ -0,0 +1 @@ +947586a2a725bb7568fe221c9e8b443056df6213979a04ce2f38b82eeb2bd5a6 \ No newline at end of file diff --git a/server/config/clients/fingerprints b/server/data/conf/client_fingerprints similarity index 98% rename from server/config/clients/fingerprints rename to server/data/conf/client_fingerprints index 9a3a8d2..e3b1efd 100644 Binary files a/server/config/clients/fingerprints and b/server/data/conf/client_fingerprints differ diff --git a/server/data/conf/topics b/server/data/conf/topics new file mode 100644 index 0000000..8a76b24 Binary files /dev/null and b/server/data/conf/topics differ diff --git a/server/server.py b/server/server.py index 5ba0135..e3b666a 100755 --- a/server/server.py +++ b/server/server.py @@ -26,6 +26,34 @@ VERSION = 'jebp 1.0' CERT_FILE = 'server/sec/server.crt.pem' CLIENT_CERT_ISSUER_NAME = 'jCloudCA-Root-CA' +class Client: + def __init__(self, fingerprint: bytes, common_name: str, admin: bool = False): + self.fingerprint = fingerprint + self.common_name = common_name + self.admin = admin + +def get_client_info(client_fingerprint: bytes): + with dbm.open('server/data/conf/client_fingerprints') as db: + if client_fingerprint not in db.keys(): + raise KeyError('Client not registered') + common_name = db[client_fingerprint].decode() + db.close() + with open('server/data/conf/client_admin_rights') as carf: + admins = carf.read().split('\n') + carf.close() + return Client(fingerprint = client_fingerprint, common_name = common_name, admin = client_fingerprint.hex() in admins) + + +def process_command(command, client: Client): + print(client.fingerprint, client.common_name, client.admin) + response = b'' + if command[0] == 0x11: + if not client.admin: + response = b'\xb7' + return response + + + async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): addr = writer.get_extra_info('peername') print(f'Connected to {addr}') @@ -68,16 +96,27 @@ async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWrit cert = x509.load_der_x509_certificate(await readmsg(reader, aesgcm, client_nonce)) key_hash = hashlib.sha256(cert.public_key().public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo)).digest() - with dbm.open('server/config/clients/fingerprints', 'c') as db: + with dbm.open('server/data/conf/client_fingerprints', 'c') as db: if key_hash not in db.keys(): + await sendmsg('ERR', writer, aesgcm, server_nonce) raise InvalidCertificateError('client not known') if not validate_cert(cert, db[key_hash].decode(), CLIENT_CERT_ISSUER_NAME): + await sendmsg('ERR', writer, aesgcm, server_nonce) raise InvalidCertificateError('client certificate not trusted') - print('Client authenticated') + await sendmsg('SCD', writer, aesgcm, server_nonce) + print('CLIENT AUTHENTICATED') + + while True: + try: + await sendmsg(process_command(await readmsg(reader, aesgcm, client_nonce), get_client_info(hashlib.sha256(cert.public_key().public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo)).digest())), writer, aesgcm, server_nonce) + except Exception as e: + print(f'{str(type(e))[8:-2]}: {e}') + break writer.close() await writer.wait_closed() + print(f'Connection to {addr} closed') async def main(): diff --git a/server/utils/clients_management/chclient.py b/server/utils/clients_management/chclient.py new file mode 100755 index 0000000..a4358b8 --- /dev/null +++ b/server/utils/clients_management/chclient.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 + +import dbm +import sys +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +import hashlib +import os + +if len(sys.argv) < 2: + print(f'{sys.argv[0]}: missing key hash') + sys.exit(1) + +with dbm.open('server/data/conf/client_fingerprints', 'c') as db: + if bytes.fromhex(sys.argv[1]) not in db.keys(): + print(f'{sys.argv[0]}: hash not registered') + sys.exit(1) + db[bytes.fromhex(sys.argv[1])] = input(f'New common name [{db[bytes.fromhex(sys.argv[1])].decode()}]: ') or db[bytes.fromhex(sys.argv[1])] + db.close() + +if os.path.exists('server/data/conf/locks/client_admin_rights.lock'): + with open('server/data/conf/locks/client_admin_rights.lock', 'r') as lockf: + print(f'{sys.argv[0]}: admin rights file is locked by process {lockf.read()}') + lockf.close() + sys.exit(1) + +try: + with open('server/data/conf/locks/client_admin_rights.lock', 'w') as lockf: + lockf.write(str(os.getpid())) + lockf.close() + + with open('server/data/conf/client_admin_rights', 'r') as carf: + admins = {kh for kh in carf.read().split('\n') if kh} + carf.close() + + admin = input(f'Is admin (y/n) [{'y' if sys.argv[1] in admins else 'n'}]: ') or ('y' if sys.argv[1] in admins else 'n') + if admin == 'y': + admin = True + else: + admin = False + if admin: + if sys.argv[1] not in admins: + print(f'Added {sys.argv[1]} to the admins') + admins.add(sys.argv[1]) + else: + try: + admins.remove(sys.argv[1]) + except KeyError: + pass + with open('server/data/conf/client_admin_rights', 'w') as carf: + carf.write('\n'.join(admins)) + carf.close() + +finally: + os.remove('server/data/conf/locks/client_admin_rights.lock') \ No newline at end of file diff --git a/server/utils/clients_management/lsclients.py b/server/utils/clients_management/lsclients.py new file mode 100755 index 0000000..cc256b7 --- /dev/null +++ b/server/utils/clients_management/lsclients.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + +import sys +import os +sys.path.append(os.getcwd()) +import dbm +from lib.terminal_table import ascii_table + +with open('server/data/conf/client_admin_rights', 'r') as carf: + admins = {kh for kh in carf.read().split('\n') if kh} + carf.close() + +with dbm.open('server/data/conf/client_fingerprints', 'c') as db: + print(ascii_table([{ + 'Key Hash': k.hex(), + 'Common name': v.decode(), + 'Is admin': 'yes' if k.hex() in admins else 'no' + } for k, v in db.items()], )) + db.close() \ No newline at end of file diff --git a/server/clients_management/mkclient.py b/server/utils/clients_management/mkclient.py similarity index 88% rename from server/clients_management/mkclient.py rename to server/utils/clients_management/mkclient.py index cd61807..5a62db6 100755 --- a/server/clients_management/mkclient.py +++ b/server/utils/clients_management/mkclient.py @@ -17,6 +17,6 @@ except: print(f'{sys.argv[0]}: invalid certificate') sys.exit(1) -with dbm.open('server/config/clients/fingerprints', 'c') as db: +with dbm.open('server/data/conf/client_fingerprints', 'c') as db: db[hashlib.sha256(cert.public_key().public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo)).digest()] = sys.argv[1] db.close() \ No newline at end of file diff --git a/server/utils/clients_management/rmclient.py b/server/utils/clients_management/rmclient.py new file mode 100755 index 0000000..956a051 --- /dev/null +++ b/server/utils/clients_management/rmclient.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 + +import sys +import os +sys.path.append(os.getcwd()) +import dbm +from lib.terminal_table import ascii_table + +if len(sys.argv) < 2: + print(f'{sys.argv[0]}: missing key hash') + sys.exit(1) + +with dbm.open('server/data/conf/client_fingerprints', 'c') as db: + try: + del db[bytes.fromhex(sys.argv[1])] + except: + print(f'{sys.argv[0]}: hash not registered') + sys.exit(1) + db.close() + +if os.path.exists('server/data/conf/locks/client_admin_rights.lock'): + with open('server/data/conf/locks/client_admin_rights.lock', 'r') as lockf: + print(f'{sys.argv[0]}: admin rights file is locked by process {lockf.read()}') + lockf.close() + sys.exit(1) + +try: + with open('server/data/conf/locks/client_admin_rights.lock', 'w') as lockf: + lockf.write(str(os.getpid())) + lockf.close() + + with open('server/data/conf/client_admin_rights', 'r') as carf: + admins = {kh for kh in carf.read().split('\n') if kh} + carf.close() + + if sys.argv[1] in admins: + admins.remove(sys.argv[1]) + with open('server/data/conf/client_admin_rights', 'w') as carf: + carf.write('\n'.join(admins)) + carf.close() + +finally: + os.remove('server/data/conf/locks/client_admin_rights.lock') \ No newline at end of file diff --git a/server/clients_management/lsclients.py b/server/utils/topics_management/lstopics.py similarity index 52% rename from server/clients_management/lsclients.py rename to server/utils/topics_management/lstopics.py index 0809af8..997b400 100755 --- a/server/clients_management/lsclients.py +++ b/server/utils/topics_management/lstopics.py @@ -5,10 +5,11 @@ import os sys.path.append(os.getcwd()) import dbm from lib.terminal_table import ascii_table +import pickle -with dbm.open('server/config/clients/fingerprints', 'c') as db: +with dbm.open('server/data/conf/topics', 'c') as db: print(ascii_table([{ - 'Key Hash': k.hex(), - 'Common name': v.decode() + 'Name': k.decode(), + 'Partitions': str(pickle.loads(v['partitions'])), } for k, v in db.items()], )) db.close() \ No newline at end of file