# Copyright 2024 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The cluster sub-module contains the definition of the Cluster object, which represents
the resources requested by the user. It also contains functions for checking the
cluster setup queue, a list of all existing clusters, and the user's working namespace.
"""
from time import sleep
from typing import List, Optional, Tuple, Dict
from ray.job_submission import JobSubmissionClient
from ...common.kubernetes_cluster.auth import (
config_check,
get_api_client,
)
from . import pretty_print
from .generate_yaml import (
generate_appwrapper,
head_worker_gpu_count_from_cluster,
)
from ...common import _kube_api_error_handling
from .generate_yaml import is_openshift_cluster
from .config import ClusterConfiguration
from .status import (
CodeFlareClusterStatus,
RayCluster,
RayClusterStatus,
)
from ..appwrapper import (
AppWrapper,
AppWrapperStatus,
)
from ...common.widgets.widgets import (
cluster_up_down_buttons,
is_notebook,
)
from kubernetes import client
import yaml
import os
import requests
from kubernetes import config
from kubernetes.client.rest import ApiException
[docs]
class Cluster:
"""
An object for requesting, bringing up, and taking down resources.
Can also be used for seeing the resource cluster status and details.
Note that currently, the underlying implementation is a Ray cluster.
"""
def __init__(self, config: ClusterConfiguration):
"""
Create the resource cluster object by passing in a ClusterConfiguration
(defined in the config sub-module). An AppWrapper will then be generated
based off of the configured resources to represent the desired cluster
request.
"""
self.config = config
self.app_wrapper_yaml = self.create_app_wrapper()
self._job_submission_client = None
self.app_wrapper_name = self.config.name
if is_notebook():
cluster_up_down_buttons(self)
@property
def _client_headers(self):
k8_client = get_api_client()
return {
"Authorization": k8_client.configuration.get_api_key_with_prefix(
"authorization"
)
}
@property
def _client_verify_tls(self):
if not is_openshift_cluster or not self.config.verify_tls:
return False
return True
@property
def job_client(self):
k8client = get_api_client()
if self._job_submission_client:
return self._job_submission_client
if is_openshift_cluster():
self._job_submission_client = JobSubmissionClient(
self.cluster_dashboard_uri(),
headers=self._client_headers,
verify=self._client_verify_tls,
)
else:
self._job_submission_client = JobSubmissionClient(
self.cluster_dashboard_uri()
)
return self._job_submission_client
[docs]
def create_app_wrapper(self):
"""
Called upon cluster object creation, creates an AppWrapper yaml based on
the specifications of the ClusterConfiguration.
"""
if self.config.namespace is None:
self.config.namespace = get_current_namespace()
if self.config.namespace is None:
print("Please specify with namespace=<your_current_namespace>")
elif type(self.config.namespace) is not str:
raise TypeError(
f"Namespace {self.config.namespace} is of type {type(self.config.namespace)}. Check your Kubernetes Authentication."
)
return generate_appwrapper(self)
# creates a new cluster with the provided or default spec
[docs]
def up(self):
"""
Applies the Cluster yaml, pushing the resource request onto
the Kueue localqueue.
"""
# check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
self._throw_for_no_raycluster()
namespace = self.config.namespace
try:
config_check()
api_instance = client.CustomObjectsApi(get_api_client())
if self.config.appwrapper:
if self.config.write_to_file:
with open(self.app_wrapper_yaml) as f:
aw = yaml.load(f, Loader=yaml.FullLoader)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
body=aw,
)
else:
aw = yaml.safe_load(self.app_wrapper_yaml)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
body=aw,
)
print(f"AppWrapper: '{self.config.name}' has successfully been created")
else:
self._component_resources_up(namespace, api_instance)
print(
f"Ray Cluster: '{self.config.name}' has successfully been created"
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
def _throw_for_no_raycluster(self):
api_instance = client.CustomObjectsApi(get_api_client())
try:
api_instance.list_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=self.config.namespace,
plural="rayclusters",
)
except ApiException as e:
if e.status == 404:
raise RuntimeError(
"RayCluster CustomResourceDefinition unavailable contact your administrator."
)
else:
raise RuntimeError(
"Failed to get RayCluster CustomResourceDefinition: " + str(e)
)
[docs]
def down(self):
"""
Deletes the AppWrapper yaml, scaling-down and deleting all resources
associated with the cluster.
"""
namespace = self.config.namespace
self._throw_for_no_raycluster()
try:
config_check()
api_instance = client.CustomObjectsApi(get_api_client())
if self.config.appwrapper:
api_instance.delete_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
name=self.app_wrapper_name,
)
print(f"AppWrapper: '{self.config.name}' has successfully been deleted")
else:
self._component_resources_down(namespace, api_instance)
print(
f"Ray Cluster: '{self.config.name}' has successfully been deleted"
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
[docs]
def status(
self, print_to_console: bool = True
) -> Tuple[CodeFlareClusterStatus, bool]:
"""
Returns the requested cluster's status, as well as whether or not
it is ready for use.
"""
ready = False
status = CodeFlareClusterStatus.UNKNOWN
if self.config.appwrapper:
# check the app wrapper status
appwrapper = _app_wrapper_status(self.config.name, self.config.namespace)
if appwrapper:
if appwrapper.status in [
AppWrapperStatus.RESUMING,
AppWrapperStatus.RESETTING,
]:
ready = False
status = CodeFlareClusterStatus.STARTING
elif appwrapper.status in [
AppWrapperStatus.FAILED,
]:
ready = False
status = CodeFlareClusterStatus.FAILED # should deleted be separate
return status, ready # exit early, no need to check ray status
elif appwrapper.status in [
AppWrapperStatus.SUSPENDED,
AppWrapperStatus.SUSPENDING,
]:
ready = False
if appwrapper.status == AppWrapperStatus.SUSPENDED:
status = CodeFlareClusterStatus.QUEUED
else:
status = CodeFlareClusterStatus.QUEUEING
if print_to_console:
pretty_print.print_app_wrappers_status([appwrapper])
return (
status,
ready,
) # no need to check the ray status since still in queue
# check the ray cluster status
cluster = _ray_cluster_status(self.config.name, self.config.namespace)
if cluster:
if cluster.status == RayClusterStatus.SUSPENDED:
ready = False
status = CodeFlareClusterStatus.SUSPENDED
if cluster.status == RayClusterStatus.UNKNOWN:
ready = False
status = CodeFlareClusterStatus.STARTING
if cluster.status == RayClusterStatus.READY:
ready = True
status = CodeFlareClusterStatus.READY
elif cluster.status in [
RayClusterStatus.UNHEALTHY,
RayClusterStatus.FAILED,
]:
ready = False
status = CodeFlareClusterStatus.FAILED
if print_to_console:
# overriding the number of gpus with requested
_, cluster.worker_gpu = head_worker_gpu_count_from_cluster(self)
pretty_print.print_cluster_status(cluster)
elif print_to_console:
if status == CodeFlareClusterStatus.UNKNOWN:
pretty_print.print_no_resources_found()
else:
pretty_print.print_app_wrappers_status([appwrapper], starting=True)
return status, ready
[docs]
def is_dashboard_ready(self) -> bool:
try:
response = requests.get(
self.cluster_dashboard_uri(),
headers=self._client_headers,
timeout=5,
verify=self._client_verify_tls,
)
except requests.exceptions.SSLError: # pragma no cover
# SSL exception occurs when oauth ingress has been created but cluster is not up
return False
if response.status_code == 200:
return True
else:
return False
[docs]
def wait_ready(self, timeout: Optional[int] = None, dashboard_check: bool = True):
"""
Waits for requested cluster to be ready, up to an optional timeout (s).
Checks every five seconds.
"""
print("Waiting for requested resources to be set up...")
time = 0
while True:
if timeout and time >= timeout:
raise TimeoutError(
f"wait() timed out after waiting {timeout}s for cluster to be ready"
)
status, ready = self.status(print_to_console=False)
if status == CodeFlareClusterStatus.UNKNOWN:
print(
"WARNING: Current cluster status is unknown, have you run cluster.up yet?"
)
if ready:
break
sleep(5)
time += 5
print("Requested cluster is up and running!")
while dashboard_check:
if timeout and time >= timeout:
raise TimeoutError(
f"wait() timed out after waiting {timeout}s for dashboard to be ready"
)
if self.is_dashboard_ready():
print("Dashboard is ready!")
break
sleep(5)
time += 5
[docs]
def details(self, print_to_console: bool = True) -> RayCluster:
cluster = _copy_to_ray(self)
if print_to_console:
pretty_print.print_clusters([cluster])
return cluster
[docs]
def cluster_uri(self) -> str:
"""
Returns a string containing the cluster's URI.
"""
return f"ray://{self.config.name}-head-svc.{self.config.namespace}.svc:10001"
[docs]
def cluster_dashboard_uri(self) -> str:
"""
Returns a string containing the cluster's dashboard URI.
"""
config_check()
if is_openshift_cluster():
try:
api_instance = client.CustomObjectsApi(get_api_client())
routes = api_instance.list_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=self.config.namespace,
plural="routes",
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
for route in routes["items"]:
if route["metadata"][
"name"
] == f"ray-dashboard-{self.config.name}" or route["metadata"][
"name"
].startswith(
f"{self.config.name}-ingress"
):
protocol = "https" if route["spec"].get("tls") else "http"
return f"{protocol}://{route['spec']['host']}"
else:
try:
api_instance = client.NetworkingV1Api(get_api_client())
ingresses = api_instance.list_namespaced_ingress(self.config.namespace)
except Exception as e: # pragma no cover
return _kube_api_error_handling(e)
for ingress in ingresses.items:
annotations = ingress.metadata.annotations
protocol = "http"
if (
ingress.metadata.name == f"ray-dashboard-{self.config.name}"
or ingress.metadata.name.startswith(f"{self.config.name}-ingress")
):
if annotations == None:
protocol = "http"
elif "route.openshift.io/termination" in annotations:
protocol = "https"
return f"{protocol}://{ingress.spec.rules[0].host}"
return "Dashboard not available yet, have you run cluster.up()?"
[docs]
def list_jobs(self) -> List:
"""
This method accesses the head ray node in your cluster and lists the running jobs.
"""
return self.job_client.list_jobs()
[docs]
def job_status(self, job_id: str) -> str:
"""
This method accesses the head ray node in your cluster and returns the job status for the provided job id.
"""
return self.job_client.get_job_status(job_id)
[docs]
def job_logs(self, job_id: str) -> str:
"""
This method accesses the head ray node in your cluster and returns the logs for the provided job id.
"""
return self.job_client.get_job_logs(job_id)
@staticmethod
def _head_worker_extended_resources_from_rc_dict(rc: Dict) -> Tuple[dict, dict]:
head_extended_resources, worker_extended_resources = {}, {}
for resource in rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["limits"].keys():
if resource in ["memory", "cpu"]:
continue
worker_extended_resources[resource] = rc["spec"]["workerGroupSpecs"][0][
"template"
]["spec"]["containers"][0]["resources"]["limits"][resource]
for resource in rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][
0
]["resources"]["limits"].keys():
if resource in ["memory", "cpu"]:
continue
head_extended_resources[resource] = rc["spec"]["headGroupSpec"]["template"][
"spec"
]["containers"][0]["resources"]["limits"][resource]
return head_extended_resources, worker_extended_resources
[docs]
def from_k8_cluster_object(
rc,
appwrapper=True,
write_to_file=False,
verify_tls=True,
):
config_check()
machine_types = (
rc["metadata"]["labels"]["orderedinstance"].split("_")
if "orderedinstance" in rc["metadata"]["labels"]
else []
)
(
head_extended_resources,
worker_extended_resources,
) = Cluster._head_worker_extended_resources_from_rc_dict(rc)
cluster_config = ClusterConfiguration(
name=rc["metadata"]["name"],
namespace=rc["metadata"]["namespace"],
machine_types=machine_types,
head_cpu_requests=rc["spec"]["headGroupSpec"]["template"]["spec"][
"containers"
][0]["resources"]["requests"]["cpu"],
head_cpu_limits=rc["spec"]["headGroupSpec"]["template"]["spec"][
"containers"
][0]["resources"]["limits"]["cpu"],
head_memory_requests=rc["spec"]["headGroupSpec"]["template"]["spec"][
"containers"
][0]["resources"]["requests"]["memory"],
head_memory_limits=rc["spec"]["headGroupSpec"]["template"]["spec"][
"containers"
][0]["resources"]["limits"]["memory"],
num_workers=rc["spec"]["workerGroupSpecs"][0]["minReplicas"],
worker_cpu_requests=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["requests"]["cpu"],
worker_cpu_limits=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["limits"]["cpu"],
worker_memory_requests=rc["spec"]["workerGroupSpecs"][0]["template"][
"spec"
]["containers"][0]["resources"]["requests"]["memory"],
worker_memory_limits=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["limits"]["memory"],
worker_extended_resource_requests=worker_extended_resources,
head_extended_resource_requests=head_extended_resources,
image=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][
0
]["image"],
appwrapper=appwrapper,
write_to_file=write_to_file,
verify_tls=verify_tls,
local_queue=rc["metadata"]
.get("labels", dict())
.get("kueue.x-k8s.io/queue-name", None),
)
return Cluster(cluster_config)
[docs]
def local_client_url(self):
ingress_domain = _get_ingress_domain(self)
return f"ray://{ingress_domain}"
def _component_resources_up(
self, namespace: str, api_instance: client.CustomObjectsApi
):
if self.config.write_to_file:
with open(self.app_wrapper_yaml) as f:
yamls = list(yaml.load_all(f, Loader=yaml.FullLoader))
for resource in yamls:
enable_ingress = (
resource.get("spec", {})
.get("headGroupSpec", {})
.get("enableIngress")
)
if resource["kind"] == "RayCluster" and enable_ingress is True:
name = resource["metadata"]["name"]
print(
f"Forbidden: RayCluster '{name}' has 'enableIngress' set to 'True'."
)
return
_create_resources(yamls, namespace, api_instance)
else:
yamls = yaml.load_all(self.app_wrapper_yaml, Loader=yaml.FullLoader)
_create_resources(yamls, namespace, api_instance)
def _component_resources_down(
self, namespace: str, api_instance: client.CustomObjectsApi
):
cluster_name = self.config.name
if self.config.write_to_file:
with open(self.app_wrapper_yaml) as f:
yamls = yaml.load_all(f, Loader=yaml.FullLoader)
_delete_resources(yamls, namespace, api_instance, cluster_name)
else:
yamls = yaml.safe_load_all(self.app_wrapper_yaml)
_delete_resources(yamls, namespace, api_instance, cluster_name)
[docs]
def list_all_clusters(namespace: str, print_to_console: bool = True):
"""
Returns (and prints by default) a list of all clusters in a given namespace.
"""
clusters = _get_ray_clusters(namespace)
if print_to_console:
pretty_print.print_clusters(clusters)
return clusters
[docs]
def list_all_queued(
namespace: str, print_to_console: bool = True, appwrapper: bool = False
):
"""
Returns (and prints by default) a list of all currently queued-up Ray Clusters
in a given namespace.
"""
if appwrapper:
resources = _get_app_wrappers(namespace, filter=[AppWrapperStatus.SUSPENDED])
if print_to_console:
pretty_print.print_app_wrappers_status(resources)
else:
resources = _get_ray_clusters(
namespace, filter=[RayClusterStatus.READY, RayClusterStatus.SUSPENDED]
)
if print_to_console:
pretty_print.print_ray_clusters_status(resources)
return resources
[docs]
def get_current_namespace(): # pragma: no cover
if os.path.isfile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"):
try:
file = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r")
active_context = file.readline().strip("\n")
return active_context
except Exception as e:
print("Unable to find current namespace")
print("trying to gather from current context")
try:
_, active_context = config.list_kube_config_contexts(config_check())
except Exception as e:
return _kube_api_error_handling(e)
try:
return active_context["context"]["namespace"]
except KeyError:
return None
[docs]
def get_cluster(
cluster_name: str,
namespace: str = "default",
write_to_file: bool = False,
verify_tls: bool = True,
):
try:
config_check()
api_instance = client.CustomObjectsApi(get_api_client())
rcs = api_instance.list_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayclusters",
)
except Exception as e:
return _kube_api_error_handling(e)
for rc in rcs["items"]:
if rc["metadata"]["name"] == cluster_name:
appwrapper = _check_aw_exists(cluster_name, namespace)
return Cluster.from_k8_cluster_object(
rc,
appwrapper=appwrapper,
write_to_file=write_to_file,
verify_tls=verify_tls,
)
raise FileNotFoundError(
f"Cluster {cluster_name} is not found in {namespace} namespace"
)
# private methods
def _delete_resources(
yamls, namespace: str, api_instance: client.CustomObjectsApi, cluster_name: str
):
for resource in yamls:
if resource["kind"] == "RayCluster":
name = resource["metadata"]["name"]
api_instance.delete_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayclusters",
name=name,
)
def _create_resources(yamls, namespace: str, api_instance: client.CustomObjectsApi):
for resource in yamls:
if resource["kind"] == "RayCluster":
api_instance.create_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayclusters",
body=resource,
)
def _check_aw_exists(name: str, namespace: str) -> bool:
try:
config_check()
api_instance = client.CustomObjectsApi(get_api_client())
aws = api_instance.list_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e, print_error=False)
for aw in aws["items"]:
if aw["metadata"]["name"] == name:
return True
return False
# Cant test this until get_current_namespace is fixed and placed in this function over using `self`
def _get_ingress_domain(self): # pragma: no cover
config_check()
if self.config.namespace != None:
namespace = self.config.namespace
else:
namespace = get_current_namespace()
domain = None
if is_openshift_cluster():
try:
api_instance = client.CustomObjectsApi(get_api_client())
routes = api_instance.list_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=namespace,
plural="routes",
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
for route in routes["items"]:
if (
route["spec"]["port"]["targetPort"] == "client"
or route["spec"]["port"]["targetPort"] == 10001
):
domain = route["spec"]["host"]
else:
try:
api_client = client.NetworkingV1Api(get_api_client())
ingresses = api_client.list_namespaced_ingress(namespace)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
for ingress in ingresses.items:
if ingress.spec.rules[0].http.paths[0].backend.service.port.number == 10001:
domain = ingress.spec.rules[0].host
return domain
def _app_wrapper_status(name, namespace="default") -> Optional[AppWrapper]:
try:
config_check()
api_instance = client.CustomObjectsApi(get_api_client())
aws = api_instance.list_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
for aw in aws["items"]:
if aw["metadata"]["name"] == name:
return _map_to_app_wrapper(aw)
return None
def _ray_cluster_status(name, namespace="default") -> Optional[RayCluster]:
try:
config_check()
api_instance = client.CustomObjectsApi(get_api_client())
rcs = api_instance.list_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayclusters",
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
for rc in rcs["items"]:
if rc["metadata"]["name"] == name:
return _map_to_ray_cluster(rc)
return None
def _get_ray_clusters(
namespace="default", filter: Optional[List[RayClusterStatus]] = None
) -> List[RayCluster]:
list_of_clusters = []
try:
config_check()
api_instance = client.CustomObjectsApi(get_api_client())
rcs = api_instance.list_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayclusters",
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
# Get a list of RCs with the filter if it is passed to the function
if filter is not None:
for rc in rcs["items"]:
ray_cluster = _map_to_ray_cluster(rc)
if filter and ray_cluster.status in filter:
list_of_clusters.append(ray_cluster)
else:
for rc in rcs["items"]:
list_of_clusters.append(_map_to_ray_cluster(rc))
return list_of_clusters
def _get_app_wrappers(
namespace="default", filter=List[AppWrapperStatus]
) -> List[AppWrapper]:
list_of_app_wrappers = []
try:
config_check()
api_instance = client.CustomObjectsApi(get_api_client())
aws = api_instance.list_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
for item in aws["items"]:
app_wrapper = _map_to_app_wrapper(item)
if filter and app_wrapper.status in filter:
list_of_app_wrappers.append(app_wrapper)
else:
# Unsure what the purpose of the filter is
list_of_app_wrappers.append(app_wrapper)
return list_of_app_wrappers
def _map_to_ray_cluster(rc) -> Optional[RayCluster]:
if "status" in rc and "state" in rc["status"]:
status = RayClusterStatus(rc["status"]["state"].lower())
else:
status = RayClusterStatus.UNKNOWN
config_check()
dashboard_url = None
if is_openshift_cluster():
try:
api_instance = client.CustomObjectsApi(get_api_client())
routes = api_instance.list_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=rc["metadata"]["namespace"],
plural="routes",
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
for route in routes["items"]:
rc_name = rc["metadata"]["name"]
if route["metadata"]["name"] == f"ray-dashboard-{rc_name}" or route[
"metadata"
]["name"].startswith(f"{rc_name}-ingress"):
protocol = "https" if route["spec"].get("tls") else "http"
dashboard_url = f"{protocol}://{route['spec']['host']}"
else:
try:
api_instance = client.NetworkingV1Api(get_api_client())
ingresses = api_instance.list_namespaced_ingress(
rc["metadata"]["namespace"]
)
except Exception as e: # pragma no cover
return _kube_api_error_handling(e)
for ingress in ingresses.items:
annotations = ingress.metadata.annotations
protocol = "http"
if (
ingress.metadata.name == f"ray-dashboard-{rc['metadata']['name']}"
or ingress.metadata.name.startswith(f"{rc['metadata']['name']}-ingress")
):
if annotations == None:
protocol = "http"
elif "route.openshift.io/termination" in annotations:
protocol = "https"
dashboard_url = f"{protocol}://{ingress.spec.rules[0].host}"
(
head_extended_resources,
worker_extended_resources,
) = Cluster._head_worker_extended_resources_from_rc_dict(rc)
return RayCluster(
name=rc["metadata"]["name"],
status=status,
# for now we are not using autoscaling so same replicas is fine
num_workers=rc["spec"]["workerGroupSpecs"][0]["replicas"],
worker_mem_limits=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["limits"]["memory"],
worker_mem_requests=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["requests"]["memory"],
worker_cpu_requests=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["requests"]["cpu"],
worker_cpu_limits=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["limits"]["cpu"],
worker_extended_resources=worker_extended_resources,
namespace=rc["metadata"]["namespace"],
head_cpu_requests=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][
0
]["resources"]["requests"]["cpu"],
head_cpu_limits=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][
0
]["resources"]["limits"]["cpu"],
head_mem_requests=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][
0
]["resources"]["requests"]["memory"],
head_mem_limits=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][
0
]["resources"]["limits"]["memory"],
head_extended_resources=head_extended_resources,
dashboard=dashboard_url,
)
def _map_to_app_wrapper(aw) -> AppWrapper:
if "status" in aw:
return AppWrapper(
name=aw["metadata"]["name"],
status=AppWrapperStatus(aw["status"]["phase"].lower()),
)
return AppWrapper(
name=aw["metadata"]["name"],
status=AppWrapperStatus("suspended"),
)
def _copy_to_ray(cluster: Cluster) -> RayCluster:
ray = RayCluster(
name=cluster.config.name,
status=cluster.status(print_to_console=False)[0],
num_workers=cluster.config.num_workers,
worker_mem_requests=cluster.config.worker_memory_requests,
worker_mem_limits=cluster.config.worker_memory_limits,
worker_cpu_requests=cluster.config.worker_cpu_requests,
worker_cpu_limits=cluster.config.worker_cpu_limits,
worker_extended_resources=cluster.config.worker_extended_resource_requests,
namespace=cluster.config.namespace,
dashboard=cluster.cluster_dashboard_uri(),
head_mem_requests=cluster.config.head_memory_requests,
head_mem_limits=cluster.config.head_memory_limits,
head_cpu_requests=cluster.config.head_cpu_requests,
head_cpu_limits=cluster.config.head_cpu_limits,
head_extended_resources=cluster.config.head_extended_resource_requests,
)
if ray.status == CodeFlareClusterStatus.READY:
ray.status = RayClusterStatus.READY
return ray