Source code for caf_verilog.caf

import numpy as np
from sk_dsp_comm import digitalcom as dc
from . caf_verilog_base import CafVerilogBase
from . xcorr import XCorr
from . reference_buffer import ReferenceBuffer
from . capture_buffer import CaptureBuffer
from . freq_shift import FreqShift
from . __version__ import __version__
from jinja2 import Environment, FileSystemLoader, Template
import os
from shutil import copy
from . quantizer import quantize
from . io_helper import write_buffer_values
from . quantizer import bin_num
from . sig_gen import phase_increment
from math import log2, ceil


[docs]class CAF(CafVerilogBase): def __init__(self, reference, received, foas, ref_i_bits=12, ref_q_bits=0, rec_i_bits=12, rec_q_bits=0, fs=625e3, n_bits=8, pipeline=True, output_dir='.'): """ :param reference: :param received: :param ref_i_bits: :param ref_q_bits: :param output_dir: """ self.reference = reference self.received = received if not len(self.reference) == (len(self.received) / 2): raise ValueError("Received signal must be twice the length of the reference signal") self.foas = foas self.fs = fs self.n_bits = n_bits self.ref_i_bits = ref_i_bits self.ref_q_bits = ref_q_bits if ref_q_bits else self.ref_i_bits self.rec_i_bits = rec_i_bits self.rec_q_bits = rec_q_bits if rec_q_bits else self.rec_i_bits self.ref_quant = quantize(self.reference, self.ref_i_bits, self.ref_q_bits) self.rec_quant = quantize(self.received, self.rec_i_bits, self.rec_q_bits) self.test_value_filename = '%s_input_values.txt' % (self.module_name()) self.test_output_filename = '%s_output_values.txt' % (self.module_name()) self.phase_increment_filename = '%s_phase_increment_values.txt' % self.module_name() self.neg_shift_filename = '%s_neg_shift_values.txt' % self.module_name() self.pip = pipeline if not self.pip: raise NotImplementedError("A non-pipelined dot-product has not been implemented as of %s" % __version__) self.output_dir = output_dir self.submodules = self.gen_submodules() self.write_module()
[docs] def gen_submodules(self): submodules = dict() submodules['reference_buffer'] = ReferenceBuffer(self.reference, self.ref_i_bits, self.ref_q_bits, self.output_dir, 'ref') submodules['capture_buffer'] = CaptureBuffer(len(self.received), self.rec_i_bits, self.rec_q_bits, self.output_dir, 'cap') submodules['freq_shift'] = FreqShift(self.received, self.freq_res(), self.fs, self.n_bits, i_bits=self.rec_i_bits, q_bits=self.rec_q_bits, output_dir=self.output_dir) submodules['x_corr'] = XCorr(self.reference, self.received, self.ref_i_bits, self.ref_q_bits, self.rec_i_bits, self.rec_q_bits, pipeline=self.pip, output_dir=self.output_dir) return submodules
[docs] def freq_res(self): freqs = list() for ff in self.foas: if ff: freqs.append(abs(ff)) freqs = np.floor(np.log10(freqs)) freqs = 10 ** freqs min_res = min(freqs) return min_res
[docs] def template_dict(self, inst_name=None): t_dict = {**self.submodules['reference_buffer'].template_dict(), **self.submodules['capture_buffer'].template_dict(), **self.submodules['freq_shift'].template_dict(), **self.submodules['x_corr'].template_dict()} t_dict['%s_foa_len' % self.module_name()] = len(self.foas) t_dict['%s_foa_len_bits' % self.module_name()] = int(ceil(log2(len(self.foas)))) t_dict['%s_phase_increment_filename' % self.module_name()] = os.path.abspath(os.path.join(self.output_dir, self.phase_increment_filename)) t_dict['%s_neg_shift_filename' % self.module_name()] = os.path.abspath(os.path.join(self.output_dir, self.neg_shift_filename)) t_dict['%s_input' % self.module_name()] = os.path.abspath(os.path.join(self.output_dir, self.test_value_filename)) t_dict['%s_name' % self.module_name()] = inst_name if inst_name else '%s_tb' % self.module_name() return t_dict
[docs] def write_module(self): super(CAF, self).write_module() params_path = os.path.abspath(os.path.join(self.tb_module_path(), 'caf_state_params.v')) self.write_phase_increment_values() copy(params_path, self.output_dir)
[docs] def gen_tb(self): write_buffer_values(self.output_dir, self.test_value_filename, self.rec_quant, self.rec_i_bits, self.rec_q_bits) self.write_tb_module()
[docs] def write_tb_module(self): t_dict = self.template_dict() template_loader = FileSystemLoader(searchpath=self.tb_module_path()) env = Environment(loader=template_loader) template = env.get_template('%s_tb.v' % self.module_name()) out_tb = template.render(**t_dict) with open(os.path.join(self.output_dir, '%s_tb.v' % self.module_name()), 'w+') as tb_file: tb_file.write(out_tb)
[docs] def write_phase_increment_values(self): phase_bits = self.template_dict()['freq_shift_phase_bits'] with open(os.path.join(self.output_dir, self.phase_increment_filename), 'w+') as pi_file: for freq in self.foas: incr = phase_increment(abs(freq), phase_bits, self.fs) pi_file.write(bin_num(incr, phase_bits) + '\n') with open(os.path.join(self.output_dir, self.neg_shift_filename), 'w+') as nn_file: for freq in self.foas: nn_file.write(str(int(freq < 0)) + '\n')
[docs]def simple_caf(x, y, foas, fs): """ Produce values for a surface plot of the Complex Ambiguity Function. This function primarily supports testing values produced via sim_helper, so y is expected to be twice the length of x. The return is the CAF surface and a time delay range normalized by the sampling frequency. :param x: Use x as a reference signal. :param y: Use y as a captured signal. :param foas: Frequency offsets, provided as a list/iterable object. :param fs: Sampling frequency :return: caf_res, dt """ nlags = len(x) ztup = (nlags, len(foas)) caf_res = np.zeros(ztup) nlen = len(y) nrange = np.arange(0, nlen) dt_lags = nlags // 2 dt = np.arange(-dt_lags, dt_lags) / float(fs) for k, Df in enumerate(foas): theta = np.exp(1j*2*np.pi*nrange*Df/float(fs)) y_shift = y * theta rxy, lags = dc.xcorr(x, y_shift, nlags) caf_res[:, k] = np.abs(rxy) return caf_res, dt