Source code for hlsfactory.opt_dsl_frontend_intel

import hashlib
import itertools
import os
import random
import re
import shutil
from pathlib import Path

from hlsfactory.framework import Design, Frontend


[docs] class ArrayPartition: # the first line as the input array_settings def __init__(self, array_settings: str): setting_lists = array_settings.split(",") self.num_of_directives = int(setting_lists[1]) self.factors = setting_lists[2].strip("[]").split() self.arr_types = setting_lists[3].strip("[]").split() # lines are actual templates to wrtie to tcl files self.directives = []
[docs] def get_flattened(self) -> list: output = [] for factor in self.factors: for array_type in self.arr_types: output.append([factor, array_type]) return output
[docs] def get_num_of_directives(self): return self.num_of_directives
[docs] def append_directives(self, line): self.directives.append(line)
[docs] def get_directives(self): return self.directives
[docs] class LoopOpt: def __init__(self, loop_settings): self.num_of_parameters = int(loop_settings.split(",")[1]) self.num_of_directives = int(loop_settings.split(",")[2]) # lines are actual templates to wrtie to tcl files self.directives = [] # factors/parameters for the tcl files in strings self.parameter_lines = []
[docs] def get_flattened(self): output = [] for line in self.parameter_lines: loop_name = line.split(",")[1] loop_pipeline = line.split(",")[2].strip() == "pipeline" loop_unroll = line.split(",")[3].strip() == "unroll" unroll_factor_lists = line.split(",")[4].strip("[]").split() unroll_factor_lists = [x.strip("[]") for x in unroll_factor_lists] for unroll_factor in unroll_factor_lists: output.append([loop_name, loop_pipeline, loop_unroll, unroll_factor]) return output
[docs] def append_parameters(self, line): self.parameter_lines.append(line)
[docs] def append_directives(self, line): self.directives.append(line)
[docs] def get_directives(self): return self.directives
[docs] def get_num_of_parameters(self): return self.num_of_parameters
[docs] def get_num_of_directives(self): return self.num_of_directives
#################################################
[docs] def parse_template( src_template: Path, ) -> tuple[list[ArrayPartition], list[LoopOpt], str]: array_partition_object_lists: list[ArrayPartition] = [] loop_opt_object_lists: list[LoopOpt] = [] static_lines = "" lines = src_template.read_text().splitlines() i = 0 while i < len(lines): line = lines[i] if line.strip().startswith("#") or line.rstrip() == "": i += 1 continue if line.strip().startswith("array_partition"): array_partition_object = ArrayPartition(line.strip()) for _x in range(array_partition_object.get_num_of_directives()): i += 1 array_partition_object.append_directives(lines[i]) array_partition_object_lists.append(array_partition_object) i += 1 continue if line.strip().startswith("loop_opt"): loop_opt_object = LoopOpt(line.strip()) for _x in range(loop_opt_object.get_num_of_parameters()): i += 1 loop_opt_object.append_parameters(lines[i]) for _x in range(loop_opt_object.get_num_of_directives()): i += 1 loop_opt_object.append_directives(lines[i]) loop_opt_object_lists.append(loop_opt_object) i += 1 continue static_lines += line + "\n" i += 1 return array_partition_object_lists, loop_opt_object_lists, static_lines
###################################################
[docs] def gen_opt(array_partition_object_lists, loop_opt_object_lists): array_partition_lines = [] for array_partition_object in array_partition_object_lists: directive_lines = array_partition_object.get_directives() array_block_lines = [] for array_settings in array_partition_object.get_flattened(): # factor is 1, no need to set directives if array_settings[0].strip() == "1": array_block_lines.append("") continue output_line = "" for line in directive_lines: temp_line = line.replace("[factor]", array_settings[0]) temp_line = temp_line.replace("[type]", array_settings[1]) output_line += temp_line + "\n" array_block_lines.append(output_line) array_partition_lines.append(array_block_lines) array_partition_lines = list(itertools.product(*array_partition_lines)) loop_opt_lines = [] for loop_opt_object in loop_opt_object_lists: lines = loop_opt_object.get_directives() directive_pipeline_lines = [] directive_unroll_lines = [] for i in range(len(lines)): if lines[i].find("set_directive_pipeline") >= 0: directive_pipeline_lines.append(lines[i]) else: directive_unroll_lines.append(lines[i]) # by default, nothing to apply to the unroll and pipeline loop_opt_block = [] for loop_opt_settings in loop_opt_object.get_flattened(): output_line = "" # need to pipeline if loop_opt_settings[1] is True: for line in directive_pipeline_lines: temp_line = line.replace("[name]", loop_opt_settings[0]) output_line += temp_line + "\n" # need to unroll if loop_opt_settings[2] is True: for line in directive_unroll_lines: temp_line = line.replace("[factor]", loop_opt_settings[3]) temp_line = temp_line.replace("[name]", loop_opt_settings[0]) output_line += temp_line + "\n" loop_opt_block.append(output_line) loop_opt_lines.append(loop_opt_block) loop_opt_lines = list(itertools.product(*loop_opt_lines)) return array_partition_lines, loop_opt_lines
### return dict[array_name, factor] ###
[docs] def get_array_partition_dic(lines: str) -> dict[str, str]: line_lists = lines.split("\n") array_dict = {} for line in line_lists: if "set_directive_array_partition" in line: array_name = line.split(" ")[-1] words = line.split() index_factor = words.index("-factor") array_factor = words[index_factor + 1] array_dict[array_name] = array_factor return array_dict
### return dict[loop_name, factor] ###
[docs] def get_loop_unroll_dic(lines: str) -> dict[str, str]: line_lists = lines.split("\n") loop_dict = {} for line in line_lists: if "set_directive_unroll" in line: loop_name = line.split(" ")[-1] loop_name = loop_name.replace('"', "") loop_name = loop_name.split("/")[-1] words = line.split() index_factor = words.index("-factor") loop_factor = words[index_factor + 1] loop_dict[loop_name] = loop_factor return loop_dict
### return list[loop_name] ###
[docs] def get_pipeline_list(lines: str) -> list[str]: line_lists = lines.split("\n") pipeline_list = [] for line in line_lists: if "set_directive_pipeline" in line: pipeline_name = line.split(" ")[-1] pipeline_name = pipeline_name.replace('"', "") pipeline_name = pipeline_name.split("/")[-1] pipeline_list.append(pipeline_name) return pipeline_list
[docs] def get_kernel(hls_template: Path) -> (str, str): lines = hls_template.read_text().splitlines() kernel_name = str kernel_c = str for line in lines: if "set_top" in line: kernel_name = line.split(" ")[-1] # print ("kernel name is", kernel_name) ######################################################################################### # Warning: It assumes that only one .c file exsits and that is the source of the kernel # ######################################################################################### if ".c" in line and "add_file" in line and "-tb" not in line: kernel_c = line.split(" ")[-1].split("/")[-1] return kernel_name, kernel_c
polybench_header_text = [ "#include <unistd.h> \n", "#include <string.h> \n", "#include <math.h> \n", "#include <HLS/stdio.h> \n", "#include <HLS/hls.h> \n", "\n", ]
[docs] def polybench_copy(source_dir: Path, target_dir: Path, kernel_name: str) -> None: # modify the header file and copy all the other files to the target folder for file_name in os.listdir(source_dir): if file_name == f"{kernel_name}.h": with (source_dir / file_name).open("r") as f: lines = f.readlines() new_lines = [] for line in lines: if "#include" not in line.strip(): new_lines.append(line) new_lines = polybench_header_text + new_lines with (target_dir / file_name).open("w") as target_f: target_f.writelines(new_lines) continue shutil.copy(source_dir / file_name, target_dir / file_name)
#### return the name list of annotated C codes #######
[docs] def generate_annotate_c( design_dir: Path, array_partition_lines, loop_opt_lines, static_lines, work_dir: Path, kernel_name: str, kernel_file: str, random_sample=False, random_sample_num=10, ) -> list[Design]: line_combos_all = list(itertools.product(array_partition_lines, loop_opt_lines)) if random_sample: line_combos_all = random.sample(line_combos_all, random_sample_num) # else: # line_combos_all = line_combos_all ##################################################################################################### # @Note: I do not use any hash here to generate the annoated C, simply put a counter at the end # ##################################################################################################### ct = 0 kernel_f = open(kernel_file) design_list = [] for a_line, l_line in line_combos_all: ct += 1 a_l = "" l_l = "" for x in a_line: a_l += x for x in l_line: l_l += x dir = work_dir / str(design_dir).split("/")[-2] / (kernel_name + "_" + str(ct)) if dir.exists(): shutil.rmtree(dir) dir.mkdir(parents=True) # print ("design dir is",str(design_dir).split('/')[-2]) # copy and modify the files to the working folder polybench_copy(design_dir, dir, kernel_name) array_partition_dic = get_array_partition_dic(a_l + l_l + static_lines) loop_unroll_dic = get_loop_unroll_dic(a_l + l_l + static_lines) pipeline_list = get_pipeline_list(a_l + l_l + static_lines) kernel_f.seek(0, 0) new_filename = dir / (kernel_name + "_" + str(ct) + ".c") new_f = open(new_filename, "w+") if "polybench" in str(design_dir): for line in kernel_f: new_line = line ### This is not safe since patterns matches only with void type function #### if "void " + kernel_name in line: new_line = "component " + new_line # insert array partition if kernel_name not in line and "DATA_TYPE" in line: array_name = line.split(" ")[-1] array_name = array_name.split("[")[0] if array_name in array_partition_dic: new_f.write( "hls_numbanks(" + array_partition_dic[array_name] + ")\n", ) ############################################################################################## # @Note: To detect a for loop label, simply detecting ':', it is unsafe but usable for now # ############################################################################################## elif ":" in line: match = re.search(r"(\w+):", line) if match: loop_name = match.group(1) if loop_name in loop_unroll_dic: new_f.write( "#pragma unroll " + loop_unroll_dic[loop_name] + "\n", ) if loop_name not in pipeline_list: new_f.write("#pragma disable_loop_pipelining\n") new_line = new_line.replace(loop_name, "") new_line = new_line.replace(":", "") new_line = new_line.replace("register", "") new_f.write(new_line) new_design = Design(new_filename, dir) design_list.append(new_design) ############Machsuite############### # numbanks( ) and array partition for machsuite elif "machsuite" in str(design_dir): # component_names= ("void " + kernel_name, "int " +kernel_name) for num, line in enumerate(kernel_f, 1): # iterate for functions only# oldline = line new_line = line is_function = re.search(r"\bint\b.*\b\(\b", line) or re.search( r"\bvoid\b.*\b\(\b", line, ) # if any(s in line for s in component_names): if is_function: params = re.findall(r"\((.*?)\)", line) stringc = "".join(map(str, params)) split_words = stringc.split(",") # print ("inside component function", kernel_name) to_replace = [] array_index = [] data_type = [] array_name = [] for word in split_words: word = word.lstrip() for i in range(len(word)): if word[i] == "[": to_replace1 = word array_index1 = word[i + 1] # the letter next to [ data_type1 = word.split()[0] array_name_is = re.findall( r"\ (.*?)\[", word, ) # this is a list array_name1 = "".join( map(str, array_name_is), ) # convert to string # print ("word... is",to_replace1,array_index1,data_type1,array_name_is,array_name1) to_replace.append(to_replace1) array_index.append(array_index1) data_type.append(data_type1) array_name.append(array_name1) newline = line # if array_name1 in array_partition_dic: for j in range(len(to_replace)): newline = newline.replace( to_replace[j], " hls_avalon_slave_memory_argument(" + array_index[j] + ") hls_numbanks(" + "8" + ")" + " hls_bankwidth(sizeof(" + data_type[j] + "))" + " " + data_type[j] + " *" + array_name[j], ) # print ("kernel name is ", kernel_name) # print ("Old line was", oldline) # print ("New line is", newline) new_f.write(newline) # loop unrolls for machsuite if ":" in line: match = re.search(r"(\w+):", line) or re.search(r"(\w+) :", line) if match: loop_name = match.group(1) if loop_name in loop_unroll_dic: new_f.write( "#pragma unroll " + loop_unroll_dic[loop_name] + "\n", ) if loop_name not in pipeline_list: new_f.write("#pragma disable_loop_pipelining\n") new_line = new_line.replace(loop_name, "") new_line = new_line.replace(":", "") new_line = new_line.replace("register", "") if not is_function: new_f.write(new_line) new_design = Design(new_filename, dir) design_list.append(new_design) new_f.close() kernel_f.close() return design_list
[docs] class OptDSLFrontendIntel(Frontend): name = "OptDSLFrontendIntel" def __init__( self, work_dir: Path, random_sample=False, random_sample_num=10, ) -> None: self.work_dir = work_dir self.random_sample = random_sample self.random_sample_num = random_sample_num # TODO: add option for random seed to make deterministic
[docs] def execute(self, design: Design, timeout: float | None) -> list[Design]: # TODO: add more runtime checks for required entry point files opt_template_fp = design.dir / "opt_template.tcl" ##################################################################################################### # @Note: It takes hls_template.tcl as the input to seek for the top function to synthesize # ##################################################################################################### hls_template = design.dir / "hls_template.tcl" ### set_top [kernel_name] from hls_template.tcl ### kernel_name, kernel_c = get_kernel(hls_template) kernel_file = design.dir / "src" / kernel_c design_dir = design.dir / "src" ( array_partition_object_lists, loop_opt_object_lists, static_lines, ) = parse_template(opt_template_fp) array_partition_lines, loop_opt_lines = gen_opt( array_partition_object_lists, loop_opt_object_lists, ) design_list = generate_annotate_c( design_dir, array_partition_lines, loop_opt_lines, static_lines, self.work_dir, kernel_name, kernel_file, self.random_sample, self.random_sample_num, ) return design_list