import flwr as fl
from flwr.server.strategy import FedAvg
from flwr.server.server import ServerConfig
from MEDfl.rw.strategy import Strategy
import asyncio
from flwr.server.client_manager import ClientManager
from flwr.server.client_proxy import ClientProxy
from flwr.common import GetPropertiesIns
from flwr.common import GetPropertiesIns
[docs]class FederatedServer:
"""
FederatedServer wraps the launch and configuration of a Flower federated learning server.
Attributes:
server_address (str): Server host and port in the format "host:port".
server_config (ServerConfig): Configuration for the Flower server.
strategy_wrapper (Strategy): Wrapper around the actual Flower strategy.
strategy (flwr.server.Strategy): Actual Flower strategy instance.
certificates (Any): Optional TLS certificates.
connected_clients (list): List of connected client IDs.
Methods:
start():
Launch the Flower server with the specified strategy and log client connections.
"""
[docs] def __init__(
self,
host="0.0.0.0",
port=8080,
num_rounds=3,
strategy=None,
certificates=None,
):
"""
Initialize the FederatedServer.
Args:
host (str): Hostname or IP to bind the server to.
port (int): Port to listen on.
num_rounds (int): Number of federated learning rounds to execute.
strategy (Optional[Strategy]): Optional custom strategy wrapper.
certificates (Optional[Any]): Optional TLS certificates.
"""
# Server address and configuration
self.server_address = f"{host}:{port}"
self.server_config = ServerConfig(num_rounds=num_rounds)
# Use custom or default strategy
self.strategy_wrapper = strategy or Strategy()
self.strategy_wrapper.create_strategy()
if self.strategy_wrapper.strategy_object is None:
raise ValueError("Strategy object not initialized. Call create_strategy() first.")
self.strategy = self.strategy_wrapper.strategy_object
self.certificates = certificates
self.connected_clients = [] # Track connected client IDs
[docs] def start(self):
"""
Start the Flower server with the configured strategy and track client connections.
"""
print(f"Using strategy: {self.strategy_wrapper.name}")
print(f"Starting Flower server on {self.server_address} with strategy {self.strategy_wrapper.name}")
# Use a custom client manager that logs client connections
client_manager = TrackingClientManager(self)
# Launch the Flower server
fl.server.start_server(
server_address=self.server_address,
config=self.server_config,
strategy=self.strategy,
certificates=self.certificates,
client_manager=client_manager,
)
[docs]class TrackingClientManager(fl.server.client_manager.SimpleClientManager):
"""
TrackingClientManager extends the default SimpleClientManager to log client connections.
Attributes:
server (FederatedServer): The FederatedServer instance this manager belongs to.
client_properties (dict): Placeholder for storing client-specific properties.
"""
[docs] def __init__(self, server):
"""
Initialize the TrackingClientManager.
Args:
server (FederatedServer): Reference to the FederatedServer.
"""
super().__init__()
self.server = server
self.client_properties = {}
[docs] def register(self, client):
"""
Register a client and log its connection.
Args:
client (ClientProxy): The client proxy being registered.
Returns:
bool: True if the client was registered successfully.
"""
success = super().register(client)
if success and client.cid not in self.server.connected_clients:
# Run the asynchronous hostname fetch synchronously
asyncio.run(self._fetch_and_log_hostname(client))
return success
[docs] async def _fetch_and_log_hostname(self, client):
"""
Asynchronously fetch and log the client's hostname or CID.
Args:
client (ClientProxy): The client proxy.
"""
# Optional: uncomment to fetch hostname from client properties
# try:
# ins = GetPropertiesIns(config={})
# props = await client.get_properties(ins=ins, timeout=10.0, group_id=0)
# hostname = props.properties.get("hostname", "unknown")
# except Exception as e:
# hostname = f"Error: {e}"
print(f"✅ Client connected - CID: {client.cid}")
self.server.connected_clients.append(client.cid)