#
# !!! WARNING !!!
#
# This example uses some private APIs.
#
import argparse
import asyncio
import logging
import ssl
import time
from dataclasses import dataclass, field
from enum import Flag
from typing import Optional, cast
import httpx
from http3_client import HttpClient
from quic_logger import QuicDirectoryLogger
from aioquic.asyncio import connect
from aioquic.h0.connection import H0_ALPN
from aioquic.h3.connection import H3_ALPN, H3Connection
from aioquic.h3.events import DataReceived, HeadersReceived, PushPromiseReceived
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.logger import QuicLogger
class Result(Flag):
V = 0x000001
H = 0x000002
D = 0x000004
C = 0x000008
R = 0x000010
Z = 0x000020
S = 0x000040
Q = 0x000080
M = 0x000100
B = 0x000200
A = 0x000400
U = 0x000800
P = 0x001000
E = 0x002000
L = 0x004000
T = 0x008000
three = 0x010000
d = 0x020000
p = 0x040000
def __str__(self):
flags = sorted(
map(
lambda x: getattr(Result, x),
filter(lambda x: not x.startswith("_"), dir(Result)),
),
key=lambda x: x.value,
)
result_str = ""
for flag in flags:
if self & flag:
result_str += flag.name
else:
result_str += "-"
return result_str
@dataclass
class Server:
name: str
host: str
port: int = 4433
http3: bool = True
http3_port: Optional[int] = None
retry_port: Optional[int] = 4434
path: str = "/"
push_path: Optional[str] = None
result: Result = field(default_factory=lambda: Result(0))
session_resumption_port: Optional[int] = None
structured_logging: bool = False
throughput_path: Optional[str] = "/%(size)d"
verify_mode: Optional[int] = None
SERVERS = [
Server("akamaiquic", "ietf.akaquic.com", port=443, verify_mode=ssl.CERT_NONE),
Server(
"aioquic", "quic.aiortc.org", port=443, push_path="/", structured_logging=True
),
Server("ats", "quic.ogre.com"),
Server("f5", "f5quic.com", retry_port=4433, throughput_path=None),
Server(
"haskell", "mew.org", structured_logging=True, throughput_path="/num/%(size)s"
),
Server("gquic", "quic.rocks", retry_port=None),
Server("lsquic", "http3-test.litespeedtech.com", push_path="/200?push=/100"),
Server(
"msquic",
"quic.westus.cloudapp.azure.com",
structured_logging=True,
throughput_path=None, # "/%(size)d.txt",
verify_mode=ssl.CERT_NONE,
),
Server(
"mvfst",
"fb.mvfst.net",
port=443,
push_path="/push",
retry_port=None,
structured_logging=True,
),
Server(
"ngtcp2",
"nghttp2.org",
push_path="/?push=/100",
structured_logging=True,
throughput_path=None,
),
Server("ngx_quic", "cloudflare-quic.com", port=443, retry_port=None),
Server("pandora", "pandora.cm.in.tum.de", verify_mode=ssl.CERT_NONE),
Server("picoquic", "test.privateoctopus.com", structured_logging=True),
Server("quant", "quant.eggert.org", http3=False, structured_logging=True),
Server("quic-go", "interop.seemann.io", port=443, retry_port=443),
Server("quiche", "quic.tech", port=8443, retry_port=8444),
Server("quicly", "quic.examp1e.net", http3_port=443),
Server("quinn", "h3.stammw.eu", port=443),
]
async def test_version_negotiation(server: Server, configuration: QuicConfiguration):
# force version negotiation
configuration.supported_versions.insert(0, 0x1A2A3A4A)
async with connect(
server.host, server.port, configuration=configuration
) as protocol:
await protocol.ping()
# check log
for stamp, category, event, data in configuration.quic_logger.to_dict()[
"traces"
][0]["events"]:
if (
category == "transport"
and event == "packet_received"
and data["packet_type"] == "version_negotiation"
):
server.result |= Result.V
async def test_handshake_and_close(server: Server, configuration: QuicConfiguration):
async with connect(
server.host, server.port, configuration=configuration
) as protocol:
await protocol.ping()
server.result |= Result.H
server.result |= Result.C
async def test_retry(server: Server, configuration: QuicConfiguration):
# skip test if there is not retry port
if server.retry_port is None:
return
async with connect(
server.host, server.retry_port, configuration=configuration
) as protocol:
await protocol.ping()
# check log
for stamp, category, event, data in configuration.quic_logger.to_dict()[
"traces"
][0]["events"]:
if (
category == "transport"
and event == "packet_received"
and data["packet_type"] == "retry"
):
server.result |= Result.S
async def test_quantum_readiness(server: Server, configuration: QuicConfiguration):
configuration.quantum_readiness_test = True
async with connect(
server.host, server.port, configuration=configuration
) as protocol:
await protocol.ping()
server.result |= Result.Q
async def test_http_0(server: Server, configuration: QuicConfiguration):
if server.path is None:
return
configuration.alpn_protocols = H0_ALPN
async with connect(
server.host,
server.port,
configuration=configuration,
create_protocol=HttpClient,
) as protocol:
protocol = cast(HttpClient, protocol)
# perform HTTP request
events = await protocol.get(
"https://{}:{}{}".format(server.host, server.port, server.path)
)
if events and isinstance(events[0], HeadersReceived):
server.result |= Result.D
async def test_http_3(server: Server, configuration: QuicConfiguration):
port = server.http3_port or server.port
if server.path is None:
return
configuration.alpn_protocols = H3_ALPN
async with connect(
server.host, port, configuration=configuration, create_protocol=HttpClient,
) as protocol:
protocol = cast(HttpClient, protocol)
# perform HTTP request
events = await protocol.get(
"https://{}:{}{}".format(server.host, server.port, server.path)
)
if events and isinstance(events[0], HeadersReceived):
server.result |= Result.D
server.result |= Result.three
# perform more HTTP requests to use QPACK dynamic tables
for i in range(2):
events = await protocol.get(
"https://{}:{}{}".format(server.host, server.port, server.path)
)
if events and isinstance(events[0], HeadersReceived):
http = cast(H3Connection, protocol._http)
protocol._quic._logger.info(
"QPACK decoder bytes RX %d TX %d",
http._decoder_bytes_received,
http._decoder_bytes_sent,
)
protocol._quic._logger.info(
"QPACK encoder bytes RX %d TX %d",
http._encoder_bytes_received,
http._encoder_bytes_sent,
)
if (
http._decoder_bytes_received
and http._decoder_bytes_sent
and http._encoder_bytes_received
and http._encoder_bytes_sent
):
server.result |= Result.d
# check push support
if server.push_path is not None:
protocol.pushes.clear()
await protocol.get(
"https://{}:{}{}".format(server.host, server.port, server.push_path)
)
await asyncio.sleep(0.5)
for push_id, events in protocol.pushes.items():
if (
len(events) >= 3
and isinstance(events[0], PushPromiseReceived)
and isinstance(events[1], HeadersReceived)
and isinstance(events[2], DataReceived)
):
protocol._quic._logger.info(
"Push promise %d for %s received (status %s)",
push_id,
dict(events[0].headers)[b":path"].decode("ascii"),
int(dict(events[1].headers)[b":status"]),
)
server.result |= Result.p
async def test_session_resumption(server: Server, configuration: QuicConfiguration):
port = server.session_resumption_port or server.port
saved_ticket = None
def session_ticket_handler(ticket):
nonlocal saved_ticket
saved_ticket = ticket
# connect a first time, receive a ticket
async with connect(
server.host,
port,
configuration=configuration,
session_ticket_handler=session_ticket_handler,
) as protocol:
await protocol.ping()
# some servers don't send the ticket immediately
await asyncio.sleep(1)
# connect a second time, with the ticket
if saved_ticket is not None:
configuration.session_ticket = saved_ticket
async with connect(server.host, port, configuration=configuration) as protocol:
await protocol.ping()
# check session was resumed
if protocol._quic.tls.session_resumed:
server.result |= Result.R
# check early data was accepted
if protocol._quic.tls.early_data_accepted:
server.result |= Result.Z
async def test_key_update(server: Server, configuration: QuicConfiguration):
async with connect(
server.host, server.port, configuration=configuration
) as protocol:
# cause some traffic
await protocol.ping()
# request key update
protocol.request_key_update()
# cause more traffic
await protocol.ping()
server.result |= Result.U
async def test_server_cid_change(server: Server, configuration: QuicConfiguration):
async with connect(
server.host, server.port, configuration=configuration
) as protocol:
# cause some traffic
await protocol.ping()
# change connection ID
protocol.change_connection_id()
# cause more traffic
await protocol.ping()
server.result |= Result.M
async def test_nat_rebinding(server: Server, configuration: QuicConfiguration):
async with connect(
server.host, server.port, configuration=configuration
) as protocol:
# cause some traffic
await protocol.ping()
# replace transport
protocol._transport.close()
await loop.create_datagram_endpoint(lambda: protocol, local_addr=("::", 0))
# cause more traffic
await protocol.ping()
# check log
path_challenges = 0
for stamp, category, event, data in configuration.quic_logger.to_dict()[
"traces"
][0]["events"]:
if (
category == "transport"
and event == "packet_received"
and data["packet_type"] == "1RTT"
):
for frame in data["frames"]:
if frame["frame_type"] == "path_challenge":
path_challenges += 1
if not path_challenges:
protocol._quic._logger.warning("No PATH_CHALLENGE received")
else:
server.result |= Result.B
async def test_address_mobility(server: Server, configuration: QuicConfiguration):
async with connect(
server.host, server.port, configuration=configuration
) as protocol:
# cause some traffic
await protocol.ping()
# replace transport
protocol._transport.close()
await loop.create_datagram_endpoint(lambda: protocol, local_addr=("::", 0))
# change connection ID
protocol.change_connection_id()
# cause more traffic
await protocol.ping()
# check log
path_challenges = 0
for stamp, category, event, data in configuration.quic_logger.to_dict()[
"traces"
][0]["events"]:
if (
category == "transport"
and event == "packet_received"
and data["packet_type"] == "1RTT"
):
for frame in data["frames"]:
if frame["frame_type"] == "path_challenge":
path_challenges += 1
if not path_challenges:
protocol._quic._logger.warning("No PATH_CHALLENGE received")
else:
server.result |= Result.A
async def test_spin_bit(server: Server, configuration: QuicConfiguration):
async with connect(
server.host, server.port, configuration=configuration
) as protocol:
for i in range(5):
await protocol.ping()
# check log
spin_bits = set()
for stamp, category, event, data in configuration.quic_logger.to_dict()[
"traces"
][0]["events"]:
if category == "connectivity" and event == "spin_bit_updated":
spin_bits.add(data["state"])
if len(spin_bits) == 2:
server.result |= Result.P
async def test_throughput(server: Server, configuration: QuicConfiguration):
failures = 0
if server.throughput_path is None:
return
for size in [5000000, 10000000]:
path = server.throughput_path % {"size": size}
print("Testing %d bytes download: %s" % (size, path))
# perform HTTP request over TCP
start = time.time()
response = httpx.get("https://" + server.host + path, verify=False)
tcp_octets = len(response.content)
tcp_elapsed = time.time() - start
assert tcp_octets == size, "HTTP/TCP response size mismatch"
# perform HTTP request over QUIC
if server.http3:
configuration.alpn_protocols = H3_ALPN
port = server.http3_port or server.port
else:
configuration.alpn_protocols = H0_ALPN
port = server.port
start = time.time()
async with connect(
server.host, port, configuration=configuration, create_protocol=HttpClient,
) as protocol:
protocol = cast(HttpClient, protocol)
http_events = await protocol.get(
"https://{}:{}{}".format(server.host, server.port, path)
)
quic_elapsed = time.time() - start
quic_octets = 0
for http_event in http_events:
if isinstance(http_event, DataReceived):
quic_octets += len(http_event.data)
assert quic_octets == size, "HTTP/QUIC response size mismatch"
print(" - HTTP/TCP completed in %.3f s" % tcp_elapsed)
print(" - HTTP/QUIC completed in %.3f s" % quic_elapsed)
if quic_elapsed > 1.1 * tcp_elapsed:
failures += 1
print(" => FAIL")
else:
print(" => PASS")
if failures == 0:
server.result |= Result.T
def print_result(server: Server) -> None:
result = str(server.result).replace("three", "3")
result = result[0:8] + " " + result[8:16] + " " + result[16:]
print("%s%s%s" % (server.name, " " * (20 - len(server.name)), result))
async def run(servers, tests, quic_log=False, secrets_log_file=None) -> None:
for server in servers:
if server.structured_logging:
server.result |= Result.L
for test_name, test_func in tests:
print("\n=== %s %s ===\n" % (server.name, test_name))
configuration = QuicConfiguration(
alpn_protocols=H3_ALPN + H0_ALPN,
is_client=True,
quic_logger=QuicDirectoryLogger(quic_log) if quic_log else QuicLogger(),
secrets_log_file=secrets_log_file,
verify_mode=server.verify_mode,
)
if test_name == "test_throughput":
timeout = 120
else:
timeout = 10
try:
await asyncio.wait_for(
test_func(server, configuration), timeout=timeout
)
except Exception as exc:
print(exc)
print("")
print_result(server)
# print summary
if len(servers) > 1:
print("SUMMARY")
for server in servers:
print_result(server)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="QUIC interop client")
parser.add_argument(
"-q",
"--quic-log",
type=str,
help="log QUIC events to QLOG files in the specified directory",
)
parser.add_argument(
"--server", type=str, help="only run against the specified server."
)
parser.add_argument("--test", type=str, help="only run the specifed test.")
parser.add_argument(
"-l",
"--secrets-log",
type=str,
help="log secrets to a file, for use with Wireshark",
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="increase logging verbosity"
)
args = parser.parse_args()
logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s %(message)s",
level=logging.DEBUG if args.verbose else logging.INFO,
)
# open SSL log file
if args.secrets_log:
secrets_log_file = open(args.secrets_log, "a")
else:
secrets_log_file = None
# determine what to run
servers = SERVERS
tests = list(filter(lambda x: x[0].startswith("test_"), globals().items()))
if args.server:
servers = list(filter(lambda x: x.name == args.server, servers))
if args.test:
tests = list(filter(lambda x: x[0] == args.test, tests))
loop = asyncio.get_event_loop()
loop.run_until_complete(
run(
servers=servers,
tests=tests,
quic_log=args.quic_log,
secrets_log_file=secrets_log_file,
)
)