Source code for hlsfactory.opt_dsl_frontend

import hashlib
import itertools
import json
import random
import time
from pathlib import Path

import psutil

from hlsfactory.framework import Design, Frontend
from hlsfactory.utils import log_execution_time_to_file


[docs] class ArrayPartition: # the first line as the input array_settings def __init__(self, array_settings: str) -> None: setting_lists = array_settings.split(",") self.num_of_directives = int(setting_lists[1]) self.factors = setting_lists[2].strip("[]").split() self.arr_types = setting_lists[3].strip("[]").split() # lines are actual templates to wrtie to tcl files self.directives = []
[docs] def get_flattened(self): output = [] for factor in self.factors: for array_type in self.arr_types: output.append([factor, array_type]) return output
[docs] def get_num_of_directives(self): return self.num_of_directives
[docs] def append_directives(self, line): self.directives.append(line)
[docs] def get_directives(self): return self.directives
[docs] class LoopOpt: def __init__(self, loop_settings: str): self.num_of_parameters = int(loop_settings.split(",")[1]) self.num_of_directives = int(loop_settings.split(",")[2]) # lines are actual templates to wrtie to tcl files self.directives = [] # factors/parameters for the tcl files in strings self.parameter_lines = []
[docs] def get_flattened(self): output = [] for line in self.parameter_lines: loop_name = line.split(",")[1] loop_pipeline = line.split(",")[2].strip() == "pipeline" loop_unroll = line.split(",")[3].strip() == "unroll" unroll_factor_lists = line.split(",")[4].strip("[]").split() unroll_factor_lists = [x.strip("[]") for x in unroll_factor_lists] for unroll_factor in unroll_factor_lists: output.append([loop_name, loop_pipeline, loop_unroll, unroll_factor]) return output
[docs] def append_parameters(self, line): self.parameter_lines.append(line)
[docs] def append_directives(self, line): self.directives.append(line)
[docs] def get_directives(self): return self.directives
[docs] def get_num_of_parameters(self): return self.num_of_parameters
[docs] def get_num_of_directives(self): return self.num_of_directives
#################################################
[docs] def parse_template( src_template: Path, ) -> tuple[list[ArrayPartition], list[LoopOpt], str]: array_partition_object_lists: list[ArrayPartition] = [] loop_opt_object_lists: list[LoopOpt] = [] static_lines = "" lines = src_template.read_text().splitlines() i = 0 while i < len(lines): line = lines[i] if line.strip().startswith("#") or line.rstrip() == "": i += 1 continue if line.strip().startswith("array_partition"): array_partition_object = ArrayPartition(line.strip()) for _x in range(array_partition_object.get_num_of_directives()): i += 1 array_partition_object.append_directives(lines[i]) array_partition_object_lists.append(array_partition_object) i += 1 continue if line.strip().startswith("loop_opt"): loop_opt_object = LoopOpt(line.strip()) for _x in range(loop_opt_object.get_num_of_parameters()): i += 1 loop_opt_object.append_parameters(lines[i]) for _x in range(loop_opt_object.get_num_of_directives()): i += 1 loop_opt_object.append_directives(lines[i]) loop_opt_object_lists.append(loop_opt_object) i += 1 continue static_lines += line + "\n" i += 1 return array_partition_object_lists, loop_opt_object_lists, static_lines
[docs] def gen_opt( array_partition_object_lists, loop_opt_object_lists, ): array_partition_lines = [] for array_partition_object in array_partition_object_lists: directive_lines = array_partition_object.get_directives() array_block_lines = [] for array_settings in array_partition_object.get_flattened(): # factor is 1, no need to set directives if array_settings[0].strip() == "1": array_block_lines.append("") continue output_line = "" for line in directive_lines: temp_line = line.replace("[factor]", array_settings[0]) temp_line = temp_line.replace("[type]", array_settings[1]) # output_line = output_line + temp_line output_line += temp_line + "\n" array_block_lines.append(output_line) array_partition_lines.append(array_block_lines) array_partition_lines = list(itertools.product(*array_partition_lines)) loop_opt_lines = [] for loop_opt_object in loop_opt_object_lists: lines = loop_opt_object.get_directives() directive_pipeline_lines = [] directive_unroll_lines = [] for i in range(len(lines)): if lines[i].find("set_directive_pipeline") >= 0: directive_pipeline_lines.append(lines[i]) else: directive_unroll_lines.append(lines[i]) # by default, nothing to apply to the unroll and pipeline loop_opt_block = [] loop_opt_block.append("") for loop_opt_settings in loop_opt_object.get_flattened(): output_line = "" # need to pipeline if loop_opt_settings[1] is True: for line in directive_pipeline_lines: temp_line = line.replace("[name]", loop_opt_settings[0]) output_line += temp_line + "\n" # need to unroll if loop_opt_settings[2] is True: for line in directive_unroll_lines: temp_line = line.replace("[factor]", loop_opt_settings[3]) temp_line = temp_line.replace("[name]", loop_opt_settings[0]) output_line += temp_line + "\n" loop_opt_block.append(output_line) loop_opt_lines.append(loop_opt_block) loop_opt_lines = list(itertools.product(*loop_opt_lines)) return array_partition_lines, loop_opt_lines
[docs] def generate_opt_sources( array_partition_lines, loop_opt_lines, static_lines, random_sample: bool = False, random_sample_num: int = 10, random_sample_seed: int = 42, ) -> tuple[list[str], list[dict]]: line_combos_all = list(itertools.product(array_partition_lines, loop_opt_lines)) if random_sample: random.seed(random_sample_seed) random_sample_num = min(random_sample_num, len(line_combos_all)) line_combos_all = random.sample(line_combos_all, random_sample_num) opt_tcl_sources: list[str] = [] opt_configs = [] for a_line, l_line in line_combos_all: config_single = {} opt_tcl_source = "" opt_tcl_source += static_lines + "\n" config_single["static_lines"] = static_lines config_single["array_partition_lines"] = [] for x in a_line: opt_tcl_source += x + "\n" config_single["array_partition_lines"].append(x) config_single["loop_opt_lines"] = [] for x in l_line: opt_tcl_source += x + "\n" config_single["loop_opt_lines"].append(x) opt_tcl_sources.append(opt_tcl_source) opt_configs.append(config_single) return opt_tcl_sources, opt_configs
[docs] class OptDSLFrontend(Frontend): name = "OptDSLFrontend" def __init__( self, work_dir: Path, random_sample: bool = False, random_sample_num: int = 10, random_sample_seed: int = 42, log_execution_time: bool = True, ) -> None: self.work_dir = work_dir self.random_sample = random_sample self.random_sample_num = random_sample_num self.random_sample_seed = random_sample_seed self.log_execution_time = log_execution_time
[docs] def execute(self, design: Design, timeout: float | None = None) -> list[Design]: t_0 = time.perf_counter() opt_template_fp = design.dir / "opt_template.tcl" ( array_partition_object_lists, loop_opt_object_lists, static_lines, ) = parse_template(opt_template_fp) array_partition_lines, loop_opt_lines = gen_opt( array_partition_object_lists, loop_opt_object_lists, ) opt_sources, opt_configs = generate_opt_sources( array_partition_lines, loop_opt_lines, static_lines, self.random_sample, self.random_sample_num, self.random_sample_seed, ) new_designs = [] for opt_source, opt_config in zip(opt_sources, opt_configs, strict=False): opt_source_hash = hashlib.md5(opt_source.encode()).hexdigest() new_design = design.copy_and_rename_to_new_parent_dir( f"{design.name}_opt_{opt_source_hash}", design.dir.parent, ) opt_fp = new_design.dir / "opt.tcl" opt_fp.write_text(opt_source) opt_config_fp = new_design.dir / "opt_config.json" opt_config_fp.write_text(json.dumps(opt_config, indent=4)) new_designs.append(new_design) t_1 = time.perf_counter() if self.log_execution_time: log_execution_time_to_file(new_design.dir, self.name, t_0, t_1) t_1 = time.perf_counter() if self.log_execution_time: log_execution_time_to_file(design.dir, self.name, t_0, t_1) return new_designs
[docs] class OptDSLPassthroughFrontend(Frontend): name = "OptDSLPassthroughFrontend" def __init__( self, work_dir: Path, ) -> None: self.work_dir = work_dir
[docs] def execute(self, design: Design, timeout: float | None = None) -> list[Design]: t_0 = time.perf_counter() new_designs = [] new_design = design.copy_and_rename_to_new_parent_dir( f"{design.name}_opt_passthrough", design.dir.parent, ) opt_fp = new_design.dir / "opt.tcl" opt_fp.write_text("") new_designs.append(new_design) t_1 = time.perf_counter() log_execution_time_to_file(new_design.dir, self.name, t_0, t_1) log_execution_time_to_file(design.dir, self.name, t_0, t_1) return new_designs