# # !!! WARNING !!! # # This example uses some private APIs. # import argparse import asyncio import logging import ssl import time from dataclasses import dataclass, field from enum import Flag from typing import Optional, cast import httpx from http3_client import HttpClient from quic_logger import QuicDirectoryLogger from aioquic.asyncio import connect from aioquic.h0.connection import H0_ALPN from aioquic.h3.connection import H3_ALPN, H3Connection from aioquic.h3.events import DataReceived, HeadersReceived, PushPromiseReceived from aioquic.quic.configuration import QuicConfiguration from aioquic.quic.logger import QuicLogger class Result(Flag): V = 0x000001 H = 0x000002 D = 0x000004 C = 0x000008 R = 0x000010 Z = 0x000020 S = 0x000040 Q = 0x000080 M = 0x000100 B = 0x000200 A = 0x000400 U = 0x000800 P = 0x001000 E = 0x002000 L = 0x004000 T = 0x008000 three = 0x010000 d = 0x020000 p = 0x040000 def __str__(self): flags = sorted( map( lambda x: getattr(Result, x), filter(lambda x: not x.startswith("_"), dir(Result)), ), key=lambda x: x.value, ) result_str = "" for flag in flags: if self & flag: result_str += flag.name else: result_str += "-" return result_str @dataclass class Server: name: str host: str port: int = 4433 http3: bool = True http3_port: Optional[int] = None retry_port: Optional[int] = 4434 path: str = "/" push_path: Optional[str] = None result: Result = field(default_factory=lambda: Result(0)) session_resumption_port: Optional[int] = None structured_logging: bool = False throughput_path: Optional[str] = "/%(size)d" verify_mode: Optional[int] = None SERVERS = [ Server("akamaiquic", "ietf.akaquic.com", port=443, verify_mode=ssl.CERT_NONE), Server( "aioquic", "quic.aiortc.org", port=443, push_path="/", structured_logging=True ), Server("ats", "quic.ogre.com"), Server("f5", "f5quic.com", retry_port=4433, throughput_path=None), Server( "haskell", "mew.org", structured_logging=True, throughput_path="/num/%(size)s" ), Server("gquic", "quic.rocks", retry_port=None), Server("lsquic", "http3-test.litespeedtech.com", push_path="/200?push=/100"), Server( "msquic", "quic.westus.cloudapp.azure.com", structured_logging=True, throughput_path=None, # "/%(size)d.txt", verify_mode=ssl.CERT_NONE, ), Server( "mvfst", "fb.mvfst.net", port=443, push_path="/push", retry_port=None, structured_logging=True, ), Server( "ngtcp2", "nghttp2.org", push_path="/?push=/100", structured_logging=True, throughput_path=None, ), Server("ngx_quic", "cloudflare-quic.com", port=443, retry_port=None), Server("pandora", "pandora.cm.in.tum.de", verify_mode=ssl.CERT_NONE), Server("picoquic", "test.privateoctopus.com", structured_logging=True), Server("quant", "quant.eggert.org", http3=False, structured_logging=True), Server("quic-go", "interop.seemann.io", port=443, retry_port=443), Server("quiche", "quic.tech", port=8443, retry_port=8444), Server("quicly", "quic.examp1e.net", http3_port=443), Server("quinn", "h3.stammw.eu", port=443), ] async def test_version_negotiation(server: Server, configuration: QuicConfiguration): # force version negotiation configuration.supported_versions.insert(0, 0x1A2A3A4A) async with connect( server.host, server.port, configuration=configuration ) as protocol: await protocol.ping() # check log for stamp, category, event, data in configuration.quic_logger.to_dict()[ "traces" ][0]["events"]: if ( category == "transport" and event == "packet_received" and data["packet_type"] == "version_negotiation" ): server.result |= Result.V async def test_handshake_and_close(server: Server, configuration: QuicConfiguration): async with connect( server.host, server.port, configuration=configuration ) as protocol: await protocol.ping() server.result |= Result.H server.result |= Result.C async def test_retry(server: Server, configuration: QuicConfiguration): # skip test if there is not retry port if server.retry_port is None: return async with connect( server.host, server.retry_port, configuration=configuration ) as protocol: await protocol.ping() # check log for stamp, category, event, data in configuration.quic_logger.to_dict()[ "traces" ][0]["events"]: if ( category == "transport" and event == "packet_received" and data["packet_type"] == "retry" ): server.result |= Result.S async def test_quantum_readiness(server: Server, configuration: QuicConfiguration): configuration.quantum_readiness_test = True async with connect( server.host, server.port, configuration=configuration ) as protocol: await protocol.ping() server.result |= Result.Q async def test_http_0(server: Server, configuration: QuicConfiguration): if server.path is None: return configuration.alpn_protocols = H0_ALPN async with connect( server.host, server.port, configuration=configuration, create_protocol=HttpClient, ) as protocol: protocol = cast(HttpClient, protocol) # perform HTTP request events = await protocol.get( "https://{}:{}{}".format(server.host, server.port, server.path) ) if events and isinstance(events[0], HeadersReceived): server.result |= Result.D async def test_http_3(server: Server, configuration: QuicConfiguration): port = server.http3_port or server.port if server.path is None: return configuration.alpn_protocols = H3_ALPN async with connect( server.host, port, configuration=configuration, create_protocol=HttpClient, ) as protocol: protocol = cast(HttpClient, protocol) # perform HTTP request events = await protocol.get( "https://{}:{}{}".format(server.host, server.port, server.path) ) if events and isinstance(events[0], HeadersReceived): server.result |= Result.D server.result |= Result.three # perform more HTTP requests to use QPACK dynamic tables for i in range(2): events = await protocol.get( "https://{}:{}{}".format(server.host, server.port, server.path) ) if events and isinstance(events[0], HeadersReceived): http = cast(H3Connection, protocol._http) protocol._quic._logger.info( "QPACK decoder bytes RX %d TX %d", http._decoder_bytes_received, http._decoder_bytes_sent, ) protocol._quic._logger.info( "QPACK encoder bytes RX %d TX %d", http._encoder_bytes_received, http._encoder_bytes_sent, ) if ( http._decoder_bytes_received and http._decoder_bytes_sent and http._encoder_bytes_received and http._encoder_bytes_sent ): server.result |= Result.d # check push support if server.push_path is not None: protocol.pushes.clear() await protocol.get( "https://{}:{}{}".format(server.host, server.port, server.push_path) ) await asyncio.sleep(0.5) for push_id, events in protocol.pushes.items(): if ( len(events) >= 3 and isinstance(events[0], PushPromiseReceived) and isinstance(events[1], HeadersReceived) and isinstance(events[2], DataReceived) ): protocol._quic._logger.info( "Push promise %d for %s received (status %s)", push_id, dict(events[0].headers)[b":path"].decode("ascii"), int(dict(events[1].headers)[b":status"]), ) server.result |= Result.p async def test_session_resumption(server: Server, configuration: QuicConfiguration): port = server.session_resumption_port or server.port saved_ticket = None def session_ticket_handler(ticket): nonlocal saved_ticket saved_ticket = ticket # connect a first time, receive a ticket async with connect( server.host, port, configuration=configuration, session_ticket_handler=session_ticket_handler, ) as protocol: await protocol.ping() # some servers don't send the ticket immediately await asyncio.sleep(1) # connect a second time, with the ticket if saved_ticket is not None: configuration.session_ticket = saved_ticket async with connect(server.host, port, configuration=configuration) as protocol: await protocol.ping() # check session was resumed if protocol._quic.tls.session_resumed: server.result |= Result.R # check early data was accepted if protocol._quic.tls.early_data_accepted: server.result |= Result.Z async def test_key_update(server: Server, configuration: QuicConfiguration): async with connect( server.host, server.port, configuration=configuration ) as protocol: # cause some traffic await protocol.ping() # request key update protocol.request_key_update() # cause more traffic await protocol.ping() server.result |= Result.U async def test_server_cid_change(server: Server, configuration: QuicConfiguration): async with connect( server.host, server.port, configuration=configuration ) as protocol: # cause some traffic await protocol.ping() # change connection ID protocol.change_connection_id() # cause more traffic await protocol.ping() server.result |= Result.M async def test_nat_rebinding(server: Server, configuration: QuicConfiguration): async with connect( server.host, server.port, configuration=configuration ) as protocol: # cause some traffic await protocol.ping() # replace transport protocol._transport.close() await loop.create_datagram_endpoint(lambda: protocol, local_addr=("::", 0)) # cause more traffic await protocol.ping() # check log path_challenges = 0 for stamp, category, event, data in configuration.quic_logger.to_dict()[ "traces" ][0]["events"]: if ( category == "transport" and event == "packet_received" and data["packet_type"] == "1RTT" ): for frame in data["frames"]: if frame["frame_type"] == "path_challenge": path_challenges += 1 if not path_challenges: protocol._quic._logger.warning("No PATH_CHALLENGE received") else: server.result |= Result.B async def test_address_mobility(server: Server, configuration: QuicConfiguration): async with connect( server.host, server.port, configuration=configuration ) as protocol: # cause some traffic await protocol.ping() # replace transport protocol._transport.close() await loop.create_datagram_endpoint(lambda: protocol, local_addr=("::", 0)) # change connection ID protocol.change_connection_id() # cause more traffic await protocol.ping() # check log path_challenges = 0 for stamp, category, event, data in configuration.quic_logger.to_dict()[ "traces" ][0]["events"]: if ( category == "transport" and event == "packet_received" and data["packet_type"] == "1RTT" ): for frame in data["frames"]: if frame["frame_type"] == "path_challenge": path_challenges += 1 if not path_challenges: protocol._quic._logger.warning("No PATH_CHALLENGE received") else: server.result |= Result.A async def test_spin_bit(server: Server, configuration: QuicConfiguration): async with connect( server.host, server.port, configuration=configuration ) as protocol: for i in range(5): await protocol.ping() # check log spin_bits = set() for stamp, category, event, data in configuration.quic_logger.to_dict()[ "traces" ][0]["events"]: if category == "connectivity" and event == "spin_bit_updated": spin_bits.add(data["state"]) if len(spin_bits) == 2: server.result |= Result.P async def test_throughput(server: Server, configuration: QuicConfiguration): failures = 0 if server.throughput_path is None: return for size in [5000000, 10000000]: path = server.throughput_path % {"size": size} print("Testing %d bytes download: %s" % (size, path)) # perform HTTP request over TCP start = time.time() response = httpx.get("https://" + server.host + path, verify=False) tcp_octets = len(response.content) tcp_elapsed = time.time() - start assert tcp_octets == size, "HTTP/TCP response size mismatch" # perform HTTP request over QUIC if server.http3: configuration.alpn_protocols = H3_ALPN port = server.http3_port or server.port else: configuration.alpn_protocols = H0_ALPN port = server.port start = time.time() async with connect( server.host, port, configuration=configuration, create_protocol=HttpClient, ) as protocol: protocol = cast(HttpClient, protocol) http_events = await protocol.get( "https://{}:{}{}".format(server.host, server.port, path) ) quic_elapsed = time.time() - start quic_octets = 0 for http_event in http_events: if isinstance(http_event, DataReceived): quic_octets += len(http_event.data) assert quic_octets == size, "HTTP/QUIC response size mismatch" print(" - HTTP/TCP completed in %.3f s" % tcp_elapsed) print(" - HTTP/QUIC completed in %.3f s" % quic_elapsed) if quic_elapsed > 1.1 * tcp_elapsed: failures += 1 print(" => FAIL") else: print(" => PASS") if failures == 0: server.result |= Result.T def print_result(server: Server) -> None: result = str(server.result).replace("three", "3") result = result[0:8] + " " + result[8:16] + " " + result[16:] print("%s%s%s" % (server.name, " " * (20 - len(server.name)), result)) async def run(servers, tests, quic_log=False, secrets_log_file=None) -> None: for server in servers: if server.structured_logging: server.result |= Result.L for test_name, test_func in tests: print("\n=== %s %s ===\n" % (server.name, test_name)) configuration = QuicConfiguration( alpn_protocols=H3_ALPN + H0_ALPN, is_client=True, quic_logger=QuicDirectoryLogger(quic_log) if quic_log else QuicLogger(), secrets_log_file=secrets_log_file, verify_mode=server.verify_mode, ) if test_name == "test_throughput": timeout = 120 else: timeout = 10 try: await asyncio.wait_for( test_func(server, configuration), timeout=timeout ) except Exception as exc: print(exc) print("") print_result(server) # print summary if len(servers) > 1: print("SUMMARY") for server in servers: print_result(server) if __name__ == "__main__": parser = argparse.ArgumentParser(description="QUIC interop client") parser.add_argument( "-q", "--quic-log", type=str, help="log QUIC events to QLOG files in the specified directory", ) parser.add_argument( "--server", type=str, help="only run against the specified server." ) parser.add_argument("--test", type=str, help="only run the specifed test.") parser.add_argument( "-l", "--secrets-log", type=str, help="log secrets to a file, for use with Wireshark", ) parser.add_argument( "-v", "--verbose", action="store_true", help="increase logging verbosity" ) args = parser.parse_args() logging.basicConfig( format="%(asctime)s %(levelname)s %(name)s %(message)s", level=logging.DEBUG if args.verbose else logging.INFO, ) # open SSL log file if args.secrets_log: secrets_log_file = open(args.secrets_log, "a") else: secrets_log_file = None # determine what to run servers = SERVERS tests = list(filter(lambda x: x[0].startswith("test_"), globals().items())) if args.server: servers = list(filter(lambda x: x.name == args.server, servers)) if args.test: tests = list(filter(lambda x: x[0] == args.test, tests)) loop = asyncio.get_event_loop() loop.run_until_complete( run( servers=servers, tests=tests, quic_log=args.quic_log, secrets_log_file=secrets_log_file, ) )