import enum
import json
import os
import pathlib
import shlex
import shutil
import subprocess
import time
from dataclasses import is_dataclass
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TypeVar
import dotenv
import psutil
import yaml
[docs]
def terminate_process_and_children(pid: int) -> None:
    """
    Terminate a process and all its child processes.
    Args:
        pid (int): The process ID of the parent process.
    """
    process_id = psutil.Process(pid)
    children = process_id.children(recursive=True)
    for child in children:
        child.terminate()
    process_id.terminate() 
[docs]
def wait_for_files_creation(
    file_paths: list[Path],
    timeout: float,
    poll_interval: float = 1,
) -> bool:
    if timeout < 0:
        msg = "Timeout must be non-negative"
        raise ValueError(msg)
    if poll_interval < 0:
        msg = "Poll interval must be non-negative"
        raise ValueError(msg)
    start_time = time.time()
    while True:
        files_exsit_list = [file_path.exists() for file_path in file_paths]
        all_files_exist = all(files_exsit_list)
        if all_files_exist:
            return True  # Return True if all files were found
        if time.time() - start_time > timeout:
            return False  # Return False if the timeout is exceeded
        time.sleep(poll_interval) 
[docs]
def find_bin_path(cmd: str) -> str:
    """
    Find the path of a binary executable.
    Args:
        cmd (str): The name of the binary executable.
    Returns:
        str: The path of the binary executable.
    Raises:
        RuntimeError: If the binary executable cannot be found.
    """
    bin_path = shutil.which(cmd)
    if bin_path is None:
        msg = (
            f"Could not find `{cmd}` automatically (via which), please specify the `cmd`"
            " path manually."
        )
        raise RuntimeError(msg)
    return bin_path 
[docs]
def log_execution_time_to_file(
    design_dir: Path,
    flow_name: str,
    t_0: float,
    t_1: float,
) -> None:
    """
    Logs the execution time of a specific flow to a file.
    Args:
        design_dir (Path): The directory where the design is located.
        flow_name (str): The name of the flow.
        t_0 (float): The start time of the execution.
        t_1 (float): The end time of the execution.
    Raises:
        RuntimeError: If the design directory does not exist.
    """
    dt = t_1 - t_0
    if not design_dir.exists():
        msg = f"Design directory {design_dir} does not exist."
        raise RuntimeError(msg)
    execution_time_data_fp = design_dir / "execution_time_data.json"
    if execution_time_data_fp.exists():
        execution_time_data = json.loads(execution_time_data_fp.read_text())
    else:
        execution_time_data = {}
    execution_time_data[flow_name] = {
        "t_start": t_0,
        "t_end": t_1,
        "dt": dt,
        "core": psutil.Process().cpu_num(),
    }
    execution_time_data_fp.write_text(json.dumps(execution_time_data, indent=4)) 
[docs]
class FlowTimer:
    """
    A class for measuring the execution time of a flow.
    Attributes:
        flow_name (str): The name of the flow.
        dir_path (Path): The directory path where the execution time will be logged.
        t_0 (float | None): The start time of the flow execution.
        t_1 (float | None): The stop time of the flow execution.
    """
    def __init__(self, flow_name: str, dir_path: Path) -> None:
        self.flow_name = flow_name
        self.dir_path = dir_path
        self.t_0: float | None = None
        self.t_1: float | None = None
[docs]
    def start(self: "FlowTimer") -> None:
        """
        Start the timer for measuring the flow execution time.
        """
        self.t_0 = time.time() 
[docs]
    def stop(self: "FlowTimer") -> None:
        """
        Stop the timer for measuring the flow execution time.
        """
        self.t_1 = time.time() 
[docs]
    def log(self) -> None:
        """
        Log the execution time of the flow to a file.
        Raises:
            RuntimeError: If either t_0 or t_1 is None.
        """
        if self.t_0 is None:
            msg = "t_0 is None"
            raise RuntimeError(msg)
        if self.t_1 is None:
            msg = "t_1 is None"
            raise RuntimeError(msg)
        log_execution_time_to_file(self.dir_path, self.flow_name, self.t_0, self.t_1) 
[docs]
    def __enter__(self) -> "FlowTimer":
        """
        Start the timer when entering a context.
        Returns:
            FlowTimer: The FlowTimer instance.
        """
        self.start()
        return self 
[docs]
    def __exit__(self, _exc_type, _exc_value, _traceback) -> None:  # noqa: ANN001
        """
        Stop the timer and log the execution time when exiting a context.
        """
        self.stop()
        self.log() 
 
T = TypeVar("T")
[docs]
def serialize_methods_for_dataclass(cls: type[T]) -> type[T]:
    """
    Decorator function that adds serialization methods to a dataclass.
    The serialization methods added are:
    - from_json: A class method that
    creates a dataclass instance from a JSON file.
    - to_json: An instance method that
    writes the dataclass instance to
    a JSON file.
    - from_yaml: A class method that
    creates a dataclass instance from a YAML file.
    - to_yaml: An instance method that
    writes the dataclass instance to
    a YAML file.
    Args:
        cls (type[T]): The dataclass to decorate.
    Returns:
        type[T]: The decorated dataclass.
    Raises:
        TypeError: If the decorated class is not a dataclass.
    """
    if not is_dataclass(cls):
        msg = "Decorated class must be a dataclass."
        raise TypeError(msg)
    def from_json(cls: type[T], json_path: Path) -> T:
        with json_path.open("r") as f:
            d = json.load(f)
        return cls(**d)  # Type checked return of the dataclass instance
    def to_json(self: T, json_path: Path) -> None:
        with json_path.open("w") as f:
            json.dump(self.__dict__, f, indent=4)
    def from_yaml(cls: type[T], yaml_path: Path) -> T:
        with yaml_path.open("r") as f:
            d = yaml.safe_load(f)
        return cls(**d)  # Type checked return of the dataclass instance
    def to_yaml(self: T, yaml_path: Path) -> None:
        with yaml_path.open("w") as f:
            yaml.safe_dump(self.__dict__, f)
    cls.from_json = classmethod(from_json)
    cls.to_json = to_json
    cls.from_yaml = classmethod(from_yaml)
    cls.to_yaml = to_yaml
    return cls 
[docs]
def timeout_not_supported(flow_name: str) -> None:
    """
    Raises a RuntimeError indicating that timeout is not supported for the current flow.
    Args:
        flow_name (str): The name of the current flow.
    Raises:
        RuntimeError: Indicates that timeout is not supported for the current flow.
    """
    msg = f"Timeout not supported for the current flow: {flow_name}"
    raise RuntimeError(msg) 
[docs]
class DirSource(enum.Enum):
    """
    Enum class representing different sources for directories.
    Used by `get_work_dir` to determine the source to look
    for a specific work directory to use.
    Options:
    - ENVFILE: Look for the directory in the .env file.
    - ENV: Look for the directory in the environment variables.
    - TEMP: Create a temporary directory.
    """
    ENVFILE = enum.auto()
    ENV = enum.auto()
    TEMP = enum.auto() 
[docs]
def get_work_dir(
    dir_source: DirSource = DirSource.ENVFILE,
    env_file_path: Path | None = None,
    use_cwd: bool = True,
) -> pathlib.Path:
    """
    Get the working directory path based on the specified directory source.
    Args:
        dir_source (DirSource, optional): The directory source to use.
        Defaults to DirSource.ENVFILE.
    Returns:
        pathlib.Path: The path to the working directory.
    Raises:
        ValueError: If the specified directory source is
        invalid or the working directory path is not found.
    """
    match dir_source:
        case DirSource.ENVFILE:
            if env_file_path is None:
                env_file_path_search = dotenv.find_dotenv(usecwd=use_cwd)
                if env_file_path_search is None:
                    msg = "The .env file could not be automatically found."
                    raise ValueError(msg)
                env_fp = Path(env_file_path_search)
            else:
                env_fp = env_file_path
            envfile_vals = dotenv.dotenv_values(
                env_fp,
            )
            if "HLSFACTORY_WORK_DIR" not in envfile_vals:
                msg = "HLSFACTORY_WORK_DIR not in .env file"
                raise ValueError(msg)
            if envfile_vals["HLSFACTORY_WORK_DIR"] is None:
                msg = "HLSFACTORY_WORK_DIR not set to a valid path in .env file"
                raise ValueError(msg)
            return pathlib.Path(envfile_vals["HLSFACTORY_WORK_DIR"])
        case DirSource.ENV:
            if "HLSFACTORY_WORK_DIR" not in os.environ:
                msg = "HLSFACTORY_WORK_DIR not in environment"
                raise ValueError(msg)
            val = os.getenv("HLSFACTORY_WORK_DIR")
            if val is None:
                msg = "HLSFACTORY_WORK_DIR not set to a valid path in environment"
                raise ValueError(msg)
            return pathlib.Path(val)
        case DirSource.TEMP:
            temp_dir = TemporaryDirectory(prefix="hlsfactory__")
            return pathlib.Path(temp_dir.name)
        case _:
            msg = f"Invalid dir_source: {dir_source}"
            raise ValueError(msg) 
    # TODO: Implement the WHICH option to use the shutil.which function
[docs]
def remove_dir_if_exists(dir_path: pathlib.Path) -> None:
    """
    Remove a directory if it exists.
    Args:
        dir_path (pathlib.Path): The path to the directory.
    Returns:
        None
    """
    if dir_path.exists():
        shutil.rmtree(dir_path) 
[docs]
def remove_and_make_new_dir_if_exists(dir_path: pathlib.Path) -> None:
    """
    Removes the directory at the given path if it exists,
    and then creates a new directory at the same path.
    Args:
        dir_path (pathlib.Path): The path to the directory.
    Returns:
        None
    """
    remove_dir_if_exists(dir_path)
    dir_path.mkdir(parents=True, exist_ok=True)