FarmVibes.AI Client

class vibe_core.client.Client

Bases: ABC

An abstract base class for clients.

abstract list_workflows()

Lists all available workflows.

Returns:

List[str] – A list of workflow names.

Raises:

NotImplementedError – If the method is not implemented by a subclass.

abstract run(workflow, geometry, time_range)

Runs a workflow.

Parameters:
  • workflow (str) – The name of the workflow to run.

  • geometry (BaseGeometry) – The geometry to run the workflow on.

  • time_range (Tuple[datetime, datetime]) – The time range to run the workflow on.

Returns:

WorkflowRun – A WorkflowRun object.

Raises:

NotImplementedError – If the method is not implemented by a subclass.

class vibe_core.client.ClusterType(value)

Bases: StrEnum

An enumeration of cluster types.

class vibe_core.client.FarmvibesAiClient(baseurl)

Bases: Client

A client for the FarmVibes.AI service.

Parameters:

baseurl (str) – The base URL of the FarmVibes.AI service.

_form_payload(workflow, parameters, geometry, time_range, input_data, run_name)

Forms a payload dictionary for submitting a workflow run.

Parameters:
  • workflow (Union[str, Dict[str, Any]]) – The name of the workflow to run or a dict containing the workflow definition.

  • parameters (Optional[Dict[str, Any]]) – A dict of optional parameters to pass to the workflow. The keys and values depend on the specific workflow definition.

  • geometry (Optional[BaseGeometry]) – The geometry to use for the input data. It must be a valid shapely geometry object (e.g., Point or Polygon). It will be converted to GeoJSON format internally. Alternatively it can be None if input_data is provided instead.

Note

Either geometry and time_range or input_data must be provided.

Warning

Providing both geometry and input_data will result in an error.

Parameters:

time_range (Optional[Tuple[datetime, datetime]]) – The time range to use for the input data. It must be a tuple of two datetime objects representing the start and end dates. Alternatively it can be None if input_data is provided instead.

Note

Either geometry and time_range or input_data must be provided.

Warning

Providing both time_range and input_data will result in an error.

Parameters:

input_data (Union[Dict[str, Union[TypeVar(T, bound= BaseVibe, covariant=True), List[TypeVar(T, bound= BaseVibe, covariant=True)]]], List[TypeVar(T, bound= BaseVibe, covariant=True)], TypeVar(T, bound= BaseVibe, covariant=True), None]) – The input data to use for the workflow run. It must be an instance of InputData or one of its subclasses (e.g., SpatioTemporalJson or SpatioTemporalRaster). Alternatively it can be None if geometry and time_range are provided instead.

Note

Either geometry and time_range or input_data must be provided.

Warning

Providing both input_data and either geometry or time_range

will result in an error.

Parameters:

run_name (str) – The name to assign to the workflow run.

Returns:

Dict[str, Any] – A dict containing the payload for submitting a workflow run. The keys are ‘run_name’, ‘workflow’, ‘parameters’, and ‘user_input’.

cancel_run(run_id)

Cancels a workflow run.

This method sends a request to the FarmVibes.AI service to cancel a workflow run that is in progress or pending. If the cancellation is successful, the workflow run status will be set to ‘cancelled’.

Note

The cancellation may take some time to take effect depending on the state of the workflow run and the service availability.

Warning

A workflow run that is already completed or failed cannot be cancelled.

Parameters:

run_id (str) – The ID of the workflow run to cancel.

Returns:

str – The message from the FarmVibes.AI service indicating whether the cancellation was successful or not.

delete_run(run_id)

Deletes a workflow run.

This method sends a request to the FarmVibes.AI service to delete a completed workflow run (i.e. a run with the a status of ‘done’, ‘failed’, or ‘cancelled’). If the deletion is successful, all cached data the workflow run produced that is not shared with other workflow runs will be deleted and status will be set to ‘deleted’.

Note

The deletion may take some time to take effect depending on the state of the workflow run and the service availability.

Warning

A workflow run that is in progress or pending cannot be deleted. A cancelled workflow run can be deleted. So, in order to delete workflow run that is in progress or pending, it first needs to be cancelled and then it can be deleted.

Parameters:

run_id (str) – The ID of the workflow run to delete.

Returns:

str – The message from the FarmVibes.AI service indicating whether the deletion request was successful or not.

describe_run(run_id)

Describes a workflow run.

This method returns a RunConfigUser object containing the description of a workflow run, such as its name, status, inputs and outputs.

Parameters:

run_id (str) – The ID of the workflow run to describe.

Returns:

RunConfigUser – A RunConfigUser object containing the workflow run description.

describe_workflow(workflow_name)

Describes a workflow.

This method returns a dictionary containing the description of a workflow, such as its inputs, outputs, parameters and tasks.

Note

The description is returned as a TaskDescription object, which is a dataclass that represents the structure and properties of a workflow.

Parameters:

workflow_name (str) – The name of the workflow to describe. It must be one of the names returned by list_workflows().

Returns:

Dict[str, Any] – A dictionary containing the workflow description. The keys are ‘name’, ‘description’, ‘inputs’, ‘outputs’ and ‘parameters’.

document_workflow(workflow_name)

Prints the documentation of a workflow.

This method prints a formatted documentation of a workflow, including its name, description, inputs, outputs and parameters.

Note

The documentation is printed to stdout and can be redirected to other outputs if needed.

Parameters:

workflow_name (str) – The name of the workflow to document.

Return type:

None

get_api_time_zone()

Gets the time zone of the FarmVibes.AI REST-API.

This method returns a tzfile object representing the time zone of the FarmVibes.AI REST-API. The time zone is determined by parsing the ‘date’ header from the response of a GET request to the base URL of the service. If the ‘date’ header is missing or invalid, a warning is issued and the client time zone is used instead.

Note

The tzfile object is a subclass of datetime.tzinfo that represents a time zone using an Olson database file.

Returns:

tzfile – The time zone of the FarmVibes.AI REST-API as a tzfile object.

get_last_runs(n)

Gets the last ‘n’ workflow runs.

This method returns a list of VibeWorkflowRun objects containing the details of the last n workflow runs.

Parameters:

n (int) – The number of workflow runs to get (with n>0).

Returns:

List[VibeWorkflowRun] – A list of VibeWorkflowRun objects.

get_run_by_id(id)

Gets a workflow run by ID.

This method returns a VibeWorkflowRun object containing the details of a workflow run by its ID.

Parameters:

id (str) – The ID of the workflow run to get.

Returns:

VibeWorkflowRun – A VibeWorkflowRun object.

get_system_metrics()

Gets system metrics from the FarmVibes.AI service.

This method returns a dictionary containing various system metrics, such as CPU usage, memory usage and disk space.

Returns:

MetricsDict – A dictionary containing system metrics.

get_workflow_yaml(workflow_name)

Gets the YAML definition of a workflow.

This method returns a string containing the YAML definition of a workflow. The YAML definition specifies the name and operations of the workflow in a human-readable format.

Parameters:

workflow_name (str) – The name of the workflow. It must be one of the names returned by list_workflows().

Returns:

str – The YAML definition of the workflow.

list_runs(ids=None, fields=None)

Lists workflow runs on the FarmVibes.AI service.

Parameters:
  • ids (Union[str, List[str], None], default: None) – The IDs of the workflow runs to list. If None, all workflow runs will be listed.

  • fields (Union[str, List[str], None], default: None) – The fields to return for each workflow run. If None, all fields will be returned.

Returns:

A list of workflow runs. Each run is represented by a dictionary with keys corresponding to the requested fields and values containing the field values.

list_workflows()

Lists all available workflows on the FarmVibes.AI service.

This method returns a list of workflow names that can be used to submit workflow runs or to get more details about a specific workflow.

Returns:

List[str] – A list of workflow names.

monitor(runs=1, refresh_time_s=1, refresh_warnings_time_min=5, timeout_min=None, detailed_task_info=False)

Monitors workflow runs.

This method will block and print the status of the runs each refresh_time_s seconds, until the workflow runs finish or it reaches timeout_min minutes. It will also print warnings every refresh_warnings_time_min minutes.

Parameters:
  • runs (Union[List[VibeWorkflowRun], VibeWorkflowRun, int], default: 1) – A list of workflow runs, a single run object, or an integer. The method will monitor the provided workflow runs. If a list of runs is provided, the method will provide a summarized table with the status of each run. If only one run is provided, the method will monitor that run directly. If an integer > 0 is provided, the method will fetch the respective last runs and provide the summarized monitor table.

  • refresh_time_s (int, default: 1) – Refresh interval in seconds (defaults to 1 second).

  • refresh_warnings_time_min (int, default: 5) – Refresh interval in minutes for updating the warning messages (defaults to 5 minutes).

  • timeout_min (Optional[int], default: None) – The maximum time to monitor the workflow run, in minutes. If not provided, the method will monitor indefinitely.

  • detailed_task_info (bool, default: False) – If True, detailed information about task progress will be included in the output (defaults to False).

Raises:

ValueError – If no workflow runs are provided (empty list).

Return type:

None

resubmit_run(run_id)

Resubmits a workflow run with the given run ID.

Parameters:

run_id (str) – The ID of the workflow run to resubmit.

Returns:

VibeWorkflowRun – The resubmitted workflow run.

run(workflow, name, *, geometry=None, time_range=None, input_data=None, parameters=None)

Runs a workflow.

This method instantiates a workflow run using the provided data and parameters.

Parameters:
  • workflow (Union[str, Dict[str, Any]]) – The name of the workflow to run or a dict containing the workflow definition.

  • name (str) – The name to assign to the workflow run.

  • geometry (Optional[BaseGeometry], default: None) – The geometry to use for the input data. It must be a valid shapely geometry object (e.g., Point or Polygon). It will be converted to GeoJSON format internally. Alternatively it can be None if input_data is provided instead.

Note

Either geometry and time_range or input_data must be provided.

Warning

Providing both geometry and input_data will result in an error.

Parameters:

time_range (Optional[Tuple[datetime, datetime]], default: None) – The time range to use for the input data. It must be a tuple of two datetime objects representing the start and end dates. Alternatively it can be None if input_data is provided instead.

Note

Either geometry and time_range or input_data must be provided.

Warning

Providing both time_range and input_data will result in an error.

Parameters:

input_data (Union[Dict[str, Union[TypeVar(T, bound= BaseVibe, covariant=True), List[TypeVar(T, bound= BaseVibe, covariant=True)]]], List[TypeVar(T, bound= BaseVibe, covariant=True)], TypeVar(T, bound= BaseVibe, covariant=True), None], default: None) – The input data to use for the workflow run. It must be an instance of InputData or one of its subclasses (e.g., SpatioTemporalJson or SpatioTemporalRaster). Alternatively it can be None if geometry and time_range are provided instead.

Note

Either geometry and time_range or input_data must be provided.

Warning

Providing both input_data and either geometry or time_range will result in an error.

Parameters:

parameters (Optional[Dict[str, Any]], default: None) – A dict of optional parameters to pass to the workflow. The keys and values depend on the specific workflow definition.

Returns:

VibeWorkflowRun – A VibeWorkflowRun object.

verify_disk_space()

Verifies that there is enough disk space available for the cache.

This method checks the system metrics returned by the FarmVibes.AI service and compares the disk free space with a predefined threshold. If the disk free space is below the threshold, a warning message is displayed to the user, suggesting to clear the cache.

Note

The disk free space threshold is defined by DISK_FREE_THRESHOLD_BYTES.

Raises:

RuntimeWarning if the disk space is low.

default_headers: Dict[str, str] = {'Accept': 'application/json', 'Content-Type': 'application/json'}

The default headers to use for requests to the FarmVibes.AI service.

class vibe_core.client.VibeWorkflowRun(id, name, workflow, parameters, client)

Bases: WorkflowRun, MonitoredWorkflowRun

Represents a workflow run in FarmVibes.AI.

Parameters:
  • id (str) – The ID of the workflow run.

  • name (str) – The name of the workflow run.

  • workflow (str) – The name of the workflow associated to the run.

  • parameters (Dict[str, Any]) – The parameters associated to the workflow run, as a dict. The keys and values depend on the specific workflow definition.

  • client (FarmvibesAiClient) – An instance of the FarmVibesAiClient class.

block_until_cancelled(timeout_s=None)

Blocks until the workflow run is cancelled, with an optional timeout in seconds.

Parameters:

timeout_s (Optional[int], default: None) – Optional timeout in seconds to wait for the workflow to be cancelled. If not provided, the method will wait indefinitely.

Raises:

RuntimeError – If the run is not cancelled before timeout_s.

Returns:

VibeWorkflowRun – The workflow run object.

block_until_complete(timeout_s=None)

Blocks until the workflow run execution completes or fails, with an optional timeout in seconds.

Parameters:

timeout_s (Optional[int], default: None) – Optional timeout in seconds to wait for the workflow to complete. If not provided, the method will wait indefinitely.

Raises:

RuntimeError – If the run does not complete before timeout_s.

Returns:

VibeWorkflowRun – The workflow run object.

block_until_deleted(timeout_s=None)

Blocks until the workflow run is deleted, with an optional timeout in seconds.

Parameters:

timeout_s (Optional[int], default: None) – Optional timeout in seconds to wait for the workflow to be deleted. If not provided, the method will wait indefinitely.

Raises:

RuntimeError – If the run does not complete before timeout_s.

Returns:

VibeWorkflowRun – The workflow run object.

cancel()

Cancels the workflow run.

Returns:

VibeWorkflowRun – The workflow run.

delete()

Deletes the workflow run.

Returns:

VibeWorkflowRun – The workflow run.

monitor(refresh_time_s=1, refresh_warnings_time_min=5, timeout_min=None, detailed_task_info=False)

Monitors the workflow run.

This method will call vibe_core.client.FarmvibesAiClient.monitor() to monitor the workflow run.

Parameters:
  • refresh_time_s (int, default: 1) – Refresh interval in seconds (defaults to 1 second).

  • refresh_warnings_time_min (int, default: 5) – Refresh interval in minutes for updating the warning messages (defaults to 5 minutes).

  • timeout_min (Optional[int], default: None) – The maximum time to monitor the workflow run, in minutes. If not provided, the method will monitor indefinitely.

  • detailed_task_info (bool, default: False) – If True, detailed information about task progress will be included in the output (defaults to False).

resubmit()

Resubmits the current workflow run.

Returns:

VibeWorkflowRun – The resubmitted workflow run instance.

id: str

The id of the run.

name: str

The name of the run.

property output: Dict[str, BaseVibe | List[BaseVibe]] | None

Gets the output of the workflow run.

property reason: str | None

Gets the reason of the workflow run.

The reason is a string that describes the status of the workflow run. In case of failure, it also contains the reason of the failure.

property status: RunStatus

Gets the status of the workflow run.

property task_details: Dict[str, RunDetails]

Gets the task details of the workflow run.

property task_status: Dict[str, str]

Gets the task status of the workflow run.

workflow: Union[str, Dict[str, Any]]

The workflow name or workflow dictionary definition of the run.

vibe_core.client.get_default_vibe_client(type='')

Gets the default vibe client.

If given a cluster type, it will try to connect to a cluster of that type assuming the data files are present.

Otherwise, it will try to connect to a remote cluster first, and if that fails, try to connect to the local one (if it exists).

Parameters:

type (Union[str, ClusterType], default: '') – The type of the cluster (from ClusterType) to connect to.

Returns:

The vibe client.

vibe_core.client.get_local_service_url()

Retrieves the local service URL used to submit workflow runs to the FarmVibes.AI service.

This function attempts to read the service URL from a file, and if that fails, it will return a fallback URL.

Returns:

str – The local service URL.

vibe_core.client.get_remote_service_url()

Gets the remote service URL.

Returns:

str – The remote service URL.

vibe_core.client.get_vibe_client(url)

Gets a vibe client given an API base URL.

Parameters:

url (str) – The URL.

Returns:

The vibe client.

vibe_core.client.DISK_FREE_THRESHOLD_BYTES = 53687091200

Threshold for disk space in bytes.

vibe_core.client.FALLBACK_SERVICE_URL

Fallback URL for FarmVibes.AI service.

vibe_core.client.FARMVIBES_AI_REMOTE_SERVICE_URL_PATH

Path to the local remote service URL file.

vibe_core.client.FARMVIBES_AI_SERVICE_URL_PATH

Path to the local service URL file.

vibe_core.client.TASK_SORT_KEY = 'submission_time'

Key for sorting tasks.

vibe_core.client.XDG_CONFIG_HOME

Path to configuration file for FarmVibes.AI service.