This is the project where TLS finally stopped feeling like a magic incantation. The goal was a chat application in Python with encrypted communication. I could have wrapped socket.socket in ssl.wrap_socket and called it done. Instead I built it from the cert generator up, watched the handshake in Wireshark, and wrote down which flags I was setting and which defaults I was trusting. What follows is the same writeup, restructured for a blog: walkthrough first, code second, threat model third.
What I was trying to learn
The learning objectives were the usual mix: how TLS secures client-server traffic, how to set up SSL certificates in Python, how to implement and test a TLS-secured chat with socket and ssl, how to analyse encrypted traffic in Wireshark, and how to be honest about the limits of a base TLS implementation. The personal goal underneath that was simpler — to be able to defend every line of code I wrote.
Why TLS, briefly
TLS — Transport Layer Security — is a cryptographic protocol designed to provide encryption over a network. It combines an asymmetric handshake (where client and server exchange information and verify each other's certificates) with a symmetric session cipher (where the actual traffic is encrypted using a shared secret derived from that handshake). The result is confidentiality, integrity, and authentication on a channel that's otherwise just bytes on a wire.
TLS replaced SSL — Secure Sockets Layer — which had accumulated enough vulnerabilities by the late 1990s that it's now deprecated. The Python module is still called ssl for historical reasons, but the protocol it speaks today is TLS.


Project shape
The deliverable is a client-server chat where the server listens for TLS connections and each client opens its own secure socket. Once the handshake completes, both sides exchange messages encrypted with a session key negotiated during the handshake. The server fans messages out to every other connected client. Architecturally: nothing exotic — a threaded TCP server with TLS bolted on, and a thin client that runs two threads (one for stdin, one for the socket).
The full source — generate_certs.py, server.py, client.py — lives on GitHub. The walkthrough below is the same shape as my original writeup: certs, then server, then client, then a run.
Generating self-signed certificates
In a TLS session, certificates do two jobs: they carry a public key, and they let the other side verify the identity bound to that key. The matching private key never leaves the endpoint it belongs to. That asymmetry is what stops a man-in-the-middle from impersonating either side — they can intercept the bytes, but they can't produce a certificate either party will accept without also holding the matching private key.
In production, certificates are issued by a Certificate Authority — Let's Encrypt, an internal PKI, anyone whose root is already in your trust store. For a local project that lives entirely on localhost, I generate them myself. Both endpoints are the issuer; both endpoints manually trust the other's cert. The trust boundary is my laptop, and that's acceptable for what this is.
I wrote generate_certs.py using the cryptography package because openssl hides too much of the structure. I wanted every field of the X.509 certificate to be a line I wrote.
1. Imports
from datetime import datetime, timedelta, timezone
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa2. Generate an RSA private key
RSA-2048 with the standard public exponent. The private key is what signs everything else and what the matching certificate makes claims about.
# 1. Generate a private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)3. Derive the public key
The public key is derived from the private key; only the public key goes into the certificate. The private key stays on disk in {prefix}.key and is loaded by whichever process owns this identity.
# 2. Generating public key from private key
public_key = private_key.public_key()4. Define the subject
The certificate's Common Name (CN) is localhost for the server and client.localhost for the client. CN-based identification is legacy; modern verification leans on the Subject Alternative Name extension we add in the next step. But it costs nothing to set both, and it makes the cert legible when you inspect it.
# 3. Define the certificate subject
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"localhost")
])
# Different CN for the client cert
if cert_name_prefix == "client":
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"client.localhost"),
])5. Build the X.509 certificate
Subject and issuer are the same — self-signed. Random serial number. Validity window of exactly one year. A SubjectAlternativeName extension pinning the cert to localhost as a DNS name. Finally, sign the whole thing with SHA-256 and the private key.
# 4. Build the certificate
cert = x509.CertificateBuilder().subject_name(subject
).issuer_name(issuer
).public_key(public_key
).serial_number(x509.random_serial_number()
).not_valid_before(datetime.now(timezone.utc)
).not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)
).add_extension(
x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
critical=False,
).sign(private_key, hashes.SHA256())6. Write the files
PEM format, unencrypted private key (acceptable for local dev, not for anywhere real), PKCS#8 encoding. After this runs, the working directory has {prefix}.key and {prefix}.crt.
# 5. Save the private key and certificate in PEM format
with open(f"{cert_name_prefix}.key", "wb") as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
with open(f"{cert_name_prefix}.crt", "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))7. Generate both identities
Call generate_cert twice — once for the server, once for the client. End result: four files in the working directory.
if __name__ == "__main__":
generate_cert("server")
generate_cert("client")A self-signed certificate isn't insecure. It's insecure by default, because the system trust store doesn't know it. The moment you explicitly load it as a trust anchor on the other side, it's exactly as trustworthy as the channel you used to ship it.
The server
server.py is the heart of the application. It binds a TCP socket, configures an SSL context, accepts connections, upgrades each one to TLS, and fans messages out to every other connected client. Threaded, one thread per client.
Imports
import socket
import ssl
import threading
import logging
from datetime import datetimeSocket and SSL context
Standard TCP socket, SO_REUSEADDR so I don't have to wait for the kernel to release the port between runs, bind, listen. None of that is TLS yet. The SSL context is where the security posture is set: Purpose.CLIENT_AUTH picks server-side defaults, verify_mode = CERT_REQUIRED forces mutual TLS, load_cert_chain gives the server its identity, load_verify_locations tells it which client certificates to trust.
# Create base socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(5)
# Create SSL context
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_cert_chain(certfile="server.crt", keyfile="server.key")
ssl_context.load_verify_locations("client.crt")Accept and wrap
Each accepted socket is wrapped with wrap_socket(..., server_side=True) inside its worker thread, not before it. The TLS handshake blocks; if I wrapped on the accept loop's thread, a slow client could stall every other handshake. Pushing the wrap into the worker isolates that cost.
client_socket, address = server_socket.accept()
ssl_socket = self.ssl_context.wrap_socket(client_socket, server_side=True)Threading
Daemon threads so they die with the process. Each thread owns its socket end-to-end.
client_thread = threading.Thread(target=handle_client, args=(client_socket, address))
client_thread.daemon = True
client_thread.start()Receive and broadcast
Inside the worker, read 1024 bytes at a time. Decode. Timestamp. Re-encode and send to every other socket in the list. The CPython GIL is doing some of the heavy lifting here — list.append and list.remove are atomic in CPython on a single list — so I didn't reach for a lock. Worth noting as a thing I'd revisit if this were anything more than a demo.
msg = ssl_socket.recv(1024).decode('utf-8')
for client in clients:
if client != sender:
client.send(msg.encode('utf-8'))Logging
INFO-level logging with timestamps. Every new connection, every disconnection, every broadcast. When something breaks at the handshake layer, this is the only window into what happened.
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
logger.info("Message log...")
logger.error("Error...")The client
client.py is the mirror image of the server: TCP socket, SSL context, wrap, connect, then two threads — one reads stdin and sends, one reads the socket and prints. There's one line I want to call out because it would be a real bug anywhere outside a local demo.
Setup and wrap
# Create base socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Create SSL context
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ssl_context.load_cert_chain(certfile="client.crt", keyfile="client.key")
ssl_context.check_hostname = False # for development only
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations("server.crt")
# Wrap socket with SSL
ssl_socket = self.ssl_context.wrap_socket(client_socket, server_hostname=host)check_hostname = False is the dangerous line. Hostname verification is what ties a certificate to the name you dialled — it stops "any cert in my trust store" from becoming "any host can present any of those certs and I'll believe them." I disabled it because the self-signed setup's CN matching gets fussy and this project isn't about debugging PKI. In production with a real CA, that line is deleted.
Connect
ssl_socket.connect((host, port))Send messages
Read from stdin, prepend the username, encode as UTF-8, send. There's no message framing — I lean on the fact that send calls are short and arrive intact under local-only load. TCP being a stream and not a message queue is a real bug waiting to happen, and I name it in the limitations section.
message = input()
formatted_message = f"{username}: {message}"
ssl_socket.send(formatted_message.encode('utf-8'))Receive messages
A second thread sits in a blocking recv loop. When a message lands, it prints it without clobbering the user's in-progress input — \r\033[K wipes the current line, the message gets printed, the > prompt is re-emitted.
# Receive messages
message = self.ssl_socket.recv(1024).decode('utf-8')
# Start a receiving thread
receive_thread = threading.Thread(target=self.receive_messages)
receive_thread.daemon = True
receive_thread.start()Every "for development only" flag is a future production bug waiting for someone to copy-paste it. Leaving the line with a visible comment is the honest move — silently turning safety off is the dishonest one.
Running it
Three terminals. Cert generation first, then the server, then one or more clients.
$ python generate_certs.pyThis drops server.crt, server.key, client.crt, and client.key in the working directory. Then the server:
$ python server.pyYou'll see logs for "Server listening on localhost:8443" and then nothing until a client connects. In separate terminals, run as many clients as you want:
$ python client.pyEach client prompts for a username, broadcasts a join message, and then drops you into the chat. The server log shows each message being received, timestamped, and relayed; the clients see each other's messages in real time. The actual bytes on the wire — which I confirmed in Wireshark — are unintelligible to anyone tapping the connection.
Threat model and limitations
TLS-encrypted communication and certificate-based authentication work as advertised here. What this project does not defend against is more interesting than what it does.
- Hostname verification is disabled on the client. Any cert in the client's trust store can impersonate any server. Fine for localhost, indefensible in production.
- No user-level authentication. Any process that holds
client.keycan connect. There is no password, no second factor, no per-user identity beyond the certificate itself. - Messages are not encrypted at rest. TLS protects the wire; it does nothing for logs, history, or local memory. A compromised endpoint leaks plaintext.
- No revocation. If a private key leaks, my only option is to reissue and redistribute. No CRL, no OCSP, no kill switch.
- No message framing.
recv(1024)assumes one send produces one readable chunk. TCP doesn't guarantee that. - No rate limiting. A misbehaving client can open many connections and burn through the server's thread budget.
| Limitation | Proposed fix |
|---|---|
| Hostname verification disabled | Enable it; issue certs via a trusted internal CA |
| No user authentication | Add a login layer (password + session token over TLS) |
| No message encryption at rest | Encrypt logs; access-controlled secure backend |
| Stream framing implicit | Length-prefix every message before send |
| Threaded I/O bottleneck | Move to asyncio with ssl=True on streams |
I think being explicit about the second list is more useful than the first. Security gets marketed as a property you have or don't. In practice it's a set of specific properties against specific adversaries, and naming the ones you don't have is how you stop yourself accidentally claiming them.
Reflection
What this exercise gave me was a less mystical relationship with TLS. It's a state machine on top of a socket. The defaults are good. The places where you override them are exactly the places where you're taking responsibility for what "secure" means in your specific setup. The certificate is a file; the trust is a decision; the encryption is just math you chose to plug in.
About two hundred lines of Python all told. But each line is now one I'd defend — which is more than I could have said before I wrote them.
Full source
For reference, the complete contents of all three files. Repo is on GitHub.
generate_certs.py
from datetime import datetime, timedelta, timezone
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
def generate_cert(cert_name_prefix="server"):
# 1. Generate a private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# 2. Generate public key from the private key
public_key = private_key.public_key()
# 3. Define the certificate subject
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"localhost")
])
if cert_name_prefix == "client":
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"client.localhost"),
])
# 4. Build the certificate
cert = x509.CertificateBuilder().subject_name(subject
).issuer_name(issuer
).public_key(public_key
).serial_number(x509.random_serial_number()
).not_valid_before(datetime.now(timezone.utc)
).not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)
).add_extension(
x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
critical=False,
).sign(private_key, hashes.SHA256())
# 5. Save the private key and the certificate
with open(f"{cert_name_prefix}.key", "wb") as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
with open(f"{cert_name_prefix}.crt", "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
print(f"Generated {cert_name_prefix}.key and {cert_name_prefix}.crt successfully")
if __name__ == "__main__":
generate_cert("server")
generate_cert("client")server.py
import socket
import ssl
import threading
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class SecureChatServer:
def __init__(self, host="localhost", port=8443):
self.host = host
self.port = port
self.clients = []
self.setup_server()
def setup_server(self):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self.ssl_context.verify_mode = ssl.CERT_REQUIRED
self.ssl_context.load_cert_chain(certfile="server.crt", keyfile="server.key")
self.ssl_context.load_verify_locations("client.crt")
logger.info(f"Server listening on {self.host}:{self.port}")
def handle_client(self, client_socket, address):
ssl_socket = self.ssl_context.wrap_socket(client_socket, server_side=True)
logger.info(f"Secure connection established with {address}")
self.clients.append(ssl_socket)
try:
while True:
message = ssl_socket.recv(1024).decode('utf-8')
if not message:
break
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
formatted = f"[{timestamp}] {address}: {message}"
logger.info(formatted)
self.broadcast(formatted, ssl_socket)
finally:
if ssl_socket in self.clients:
self.clients.remove(ssl_socket)
ssl_socket.close()
def broadcast(self, message, sender_socket=None):
for client in self.clients:
if client != sender_socket:
try:
client.send(message.encode('utf-8'))
except Exception:
self.clients.remove(client)
def start(self):
while True:
client_socket, address = self.server_socket.accept()
t = threading.Thread(target=self.handle_client, args=(client_socket, address))
t.daemon = True
t.start()
if __name__ == "__main__":
SecureChatServer().start()client.py
import socket
import ssl
import threading
import logging
import sys
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class SecureChatClient:
def __init__(self, host="localhost", port=8443):
self.host = host
self.port = port
self.username = None
self.setup_client()
def setup_client(self):
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
self.ssl_context.load_cert_chain(certfile="client.crt", keyfile="client.key")
self.ssl_context.check_hostname = False # for development only
self.ssl_context.verify_mode = ssl.CERT_REQUIRED
self.ssl_context.load_verify_locations("server.crt")
self.ssl_socket = self.ssl_context.wrap_socket(
self.client_socket, server_hostname=self.host
)
def send_message(self, message):
formatted = f"{self.username}: {message}"
self.ssl_socket.send(formatted.encode('utf-8'))
def receive_messages(self):
while True:
message = self.ssl_socket.recv(1024).decode('utf-8')
if not message:
break
sys.stdout.write('\r\033[K')
print(message)
sys.stdout.write('> ')
sys.stdout.flush()
def start(self):
self.ssl_socket.connect((self.host, self.port))
self.username = input("Enter your username: ").strip()
self.send_message("has joined the chat")
t = threading.Thread(target=self.receive_messages)
t.daemon = True
t.start()
while True:
sys.stdout.write('> ')
sys.stdout.flush()
message = input()
if message.lower() == 'quit':
self.send_message("has left the chat")
break
self.send_message(message)
if __name__ == "__main__":
SecureChatClient().start()