Files
jakob.scheid b1ac351ad1 geändert: README.md
geändert:       client/client.py
	gelöscht:       lib/__pycache__/crypto_utils.cpython-313.pyc
	gelöscht:       lib/__pycache__/jebp_utils.cpython-313.pyc
	gelöscht:       lib/__pycache__/terminal_table.cpython-313.pyc
	geändert:       lib/jebp_utils.py
	gelöscht:       server/clients_management/chclient.py
	gelöscht:       server/clients_management/rmclient.py
	neue Datei:     server/data/conf/client_admin_rights
	umbenannt:      server/config/clients/fingerprints -> server/data/conf/client_fingerprints
	neue Datei:     server/data/conf/topics
	geändert:       server/server.py
	neue Datei:     server/utils/clients_management/chclient.py
	neue Datei:     server/utils/clients_management/lsclients.py
	umbenannt:      server/clients_management/mkclient.py -> server/utils/clients_management/mkclient.py
	neue Datei:     server/utils/clients_management/rmclient.py
	umbenannt:      server/clients_management/lsclients.py -> server/utils/topics_management/lstopics.py
2026-01-11 12:54:26 +01:00

103 lines
3.2 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
import os
sys.path.append(os.getcwd())
import asyncio
from lib.crypto_utils import (
generate_keypair,
serialize_public_key,
deserialize_public_key,
derive_aes_key,
)
from lib.jebp_utils import sendmsg, readmsg, MessageFormatError, InvalidCertificateError, validate_cert
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.backends import default_backend
from cryptography import x509
import base64
import dbm
HOST = "127.0.0.1"
PORT = 8888
KNOWN_PROTOCOLS = (b'jebp 1.0',)
SERVER_CERT_FILE = 'server/sec/server.crt.pem'
CLIENT_CERT_FILE = 'client/sec/client.crt.pem'
REQUIRED_CERT_COMMON_NAME = 'jeb'
REQUIRED_ISSUER_CERT_COMMON_NAME = 'jCloudCA-Root-CA'
async def main():
reader, writer = await asyncio.open_connection(HOST, PORT)
try:
try:
assert await readmsg(reader) in KNOWN_PROTOCOLS
except AssertionError:
print('Unknown protocol')
writer.close()
await writer.wait_closed()
return
# BEGIN ENCRYPTION HANDSHAKE
# 1. Client ECC keys
client_priv, client_pub = generate_keypair()
# 2. Send client public key
await sendmsg(serialize_public_key(client_pub), writer)
# 3. Receive server public key
server_pub_bytes = await readmsg(reader)
server_pub = deserialize_public_key(server_pub_bytes)
# 4. Derive shared AES key
aes_key = derive_aes_key(client_priv, server_pub)
aesgcm = AESGCM(aes_key)
client_nonce = os.urandom(12)
await sendmsg(client_nonce, writer)
server_nonce = await readmsg(reader)
test_bytes = os.urandom(32)
await sendmsg(test_bytes, writer, aesgcm, client_nonce)
rec = await readmsg(reader)
if rec != test_bytes:
raise Exception('encryption handshake failed')
await sendmsg(b'', writer)
# BEGIN SERVER AUTHENTICATION HANDSHAKE
cert_data = await readmsg(reader, aesgcm, server_nonce)
cert = x509.load_der_x509_certificate(cert_data, default_backend())
if not validate_cert(cert, REQUIRED_CERT_COMMON_NAME, REQUIRED_ISSUER_CERT_COMMON_NAME):
raise InvalidCertificateError('certificate not trusted')
# BEGIN CLIENT AUTHENTICATION HANDSHAKE
with open(CLIENT_CERT_FILE, 'rb') as certfile:
cert_data = certfile.read()
certfile.close()
await sendmsg(base64.b64decode(cert_data.replace(b'-----BEGIN CERTIFICATE-----', b'').replace(b'-----END CERTIFICATE-----', b'').strip()), writer, aesgcm, client_nonce)
msg = await readmsg(reader, aesgcm, server_nonce)
if msg == b'SCD':
print('LOGIN SUCCEEDED')
elif msg == b'ERR':
print('LOGIN FAILED')
await sendmsg(b'\x11', writer, aesgcm, client_nonce)
print(await readmsg(reader, aesgcm, server_nonce))
except MessageFormatError:
print('invalid message format')
except Exception as e:
print(f'{str(type(e))[8:-2]}: {e}')
finally:
writer.close()
await writer.wait_closed()
return
asyncio.run(main())