import grpc
from concurrent import futures
import os
from proto import fsl_pb2_grpc
from src.shared.common import cfg
from src.shared.runtime import grpc_channel_options
def run_server(servicer) -> None:
max_workers = cfg.get("server", {}).get("max_workers", 10)
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=max_workers),
options=grpc_channel_options(),
)
fsl_pb2_grpc.add_FSLServiceServicer_to_server(servicer, server)
grpc_cfg = cfg.get("grpc", {})
server_port = grpc_cfg.get("server_port", 50051)
bind_host = os.getenv("FSL_SERVER_BIND_HOST", str(grpc_cfg.get("bind_host", "[::]")))
bind_addr = f"{bind_host}:{server_port}"
tls_enabled = grpc_cfg.get("tls_enabled", False)
if tls_enabled:
cert_path = grpc_cfg.get("tls_cert_path")
key_path = grpc_cfg.get("tls_key_path")
if not cert_path or not key_path:
raise RuntimeError(
"grpc.tls_enabled is true but tls_cert_path / tls_key_path are not set in config."
)
with open(cert_path, "rb") as fh:
cert_pem = fh.read()
with open(key_path, "rb") as fh:
key_pem = fh.read()
credentials = grpc.ssl_server_credentials([(key_pem, cert_pem)])
bind_result = server.add_secure_port(bind_addr, credentials)
print(f"[SERVER] TLS enabled (cert={cert_path})")
else:
bind_result = server.add_insecure_port(bind_addr)
if bind_result == 0 and bind_host == "[::]":
fallback_addr = f"0.0.0.0:{server_port}"
print(f"[SERVER] Failed to bind {bind_addr}; retrying with {fallback_addr}")
bind_result = server.add_insecure_port(fallback_addr)
bind_addr = fallback_addr
if bind_result == 0:
raise RuntimeError(
f"Failed to bind gRPC server on {bind_addr}. "
"Try setting FSL_SERVER_BIND_HOST=0.0.0.0."
)
server.start()
print(f"[SERVER] Listening for incoming FSL connections on {bind_addr}...")
try:
while True:
# Timeout is only used as a periodic tick for checking custom shutdown conditions.
# Do not branch on the return value here because grpc's return semantics are not
# a reliable "server terminated" signal across versions.
server.wait_for_termination(timeout=1.0)
if hasattr(servicer, "should_shutdown") and servicer.should_shutdown():
print("[SERVER] Shutdown requested by servicer. Stopping server...")
break
except KeyboardInterrupt:
print("\n[SERVER] Keyboard interrupt received. Shutting down gracefully...")
finally:
print("[SERVER] Executing safety mechanism: Flushing remaining logs...")
if hasattr(servicer, "flush_logs"):
servicer.flush_logs()
# Give a 1s grace period for pending RPC threads to finish to avoid "terminate called" crash.
server.stop(1).wait()
print("[SERVER] Shutdown complete.")