Created
January 30, 2022 11:01
-
-
Save Semnodime/4d085ae6b3de5e874b528cb1beb14449 to your computer and use it in GitHub Desktop.
Script to preprocess IQ-signals in-place: Remove noise and combine nearby signal parts into one.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import numpy as np | |
description = """Script to preprocess IQ-signals in-place: Remove noise and combine nearby signal parts into one.""" | |
def list_strong_signal_indices(signal, threshold: int): | |
""" | |
Return a list of all indices where the magnitude of the I-part is not smaller than the threshold. | |
:param signal: numpy array. | |
:param threshold: integer which specifies the minimum magnitude of the I-part sufficient to let the sample pass. | |
:return: generator of integer indices | |
""" | |
ifile_I, ifile_Q = np.hsplit(signal, 2) | |
return np.where((ifile_I >= threshold) | (ifile_I <= -threshold))[0] | |
def indices_to_intervals(indices): | |
""" | |
Iterate over indices and yield an interval for every sequence of successive indices. | |
:param indices: iterable integer indices | |
:return: generator of interval tuples | |
Example: | |
[1, 2, 3, 4, 10, 11, 15] -> (x for x in [(1,4), (10, 11), (15, 15)]) | |
""" | |
iterator = (i for i in indices) | |
try: | |
rising_flank = next(iterator) | |
falling_flank = rising_flank | |
except StopIteration: | |
return | |
for r2 in iterator: | |
if r2 != falling_flank + 1: | |
yield rising_flank, falling_flank | |
rising_flank = r2 | |
falling_flank = r2 | |
yield rising_flank, falling_flank | |
def connect_nearby_intervals(intervals, threshold: int): | |
""" | |
Iterate over intervals and combine them so that no two intervals with a gap smaller than the threshold remains. | |
:param intervals: iterable intervals. | |
:param threshold: integer which specifies the maximum gap that will be closed. | |
:return: generator of combined intervals. | |
Example: | |
nearby=4 | |
[(1, 2), (4, 5), (10, 11), (17,95), (100, 120)] -> (x for x in [(1,11), (17, 120)]) | |
""" | |
iterator = (i for i in intervals) | |
try: | |
start, stop = next(iterator) | |
except StopIteration: | |
return | |
for i in iterator: | |
start2, stop2 = i | |
if start2 <= stop + threshold + 1: | |
stop = stop2 | |
else: | |
yield start, stop | |
start = start2 | |
stop = stop2 | |
yield start, stop | |
def remove_small_intervals(intervals, threshold: int): | |
""" | |
Yield all intervals but those which are smaller than the threshold. | |
:param intervals: iterable interval tuples. | |
:param threshold: integer which specifies the minimum size a passing interval can have. | |
:return: generator of interval tuples. | |
""" | |
for i in intervals: | |
if i[1] - i[0] + 1 >= threshold: | |
yield i | |
def filter_signal_by_intervals(signal, intervals, default_value: int = 0): | |
""" | |
Reset the signal to the given default value outside the given intervals. | |
:param signal: numpy array of the IQ-signal in shape(-1, 2). | |
:param intervals: iterable integer intervals where the signal untouched | |
:param default_value: integer value to which the filtered signal will be reset outside the given intervals. | |
:return: None, as the signal is modified in-place. | |
""" | |
start = \ | |
0 | |
for start2, stop2 in intervals: | |
signal[start:start2][:] = default_value | |
start = stop2 + 1 | |
signal[start:][:] = default_value | |
def rewrite_file(path: str, min_magnitude: int, max_gap_to_close: int, min_size: int): | |
""" | |
Rewrite .complex32s file on disk based on given parameters. | |
:param path: string which specifies the path to the .complex32s formatted signal | |
:param min_magnitude: | |
:param max_gap_to_close: | |
:param min_size: | |
:return: | |
""" | |
""" | |
:param path: | |
:return: | |
""" | |
# Skip processing if the file would remain unchanged | |
if min_magnitude == 0: | |
return | |
signal_IQ = np.memmap(path, mode='r+', dtype=np.int16) | |
signal_IQ = np.reshape(signal_IQ, (-1, 2)) | |
signal_sites = list_strong_signal_indices(signal_IQ, threshold=min_magnitude) | |
intervals = indices_to_intervals(signal_sites) | |
# Skip closing gaps if the signal would remain unchanged | |
if max_gap_to_close > 0: | |
intervals = connect_nearby_intervals(intervals, threshold=max_gap_to_close) | |
# Skip removing intervals if the signal would remain unchanged | |
if min_size > 1: | |
intervals = remove_small_intervals(intervals, threshold=min_size) | |
filter_signal_by_intervals(signal_IQ, intervals=intervals) | |
if __name__ == '__main__': | |
import argparse | |
# Parse command line arguments | |
parser = argparse.ArgumentParser(description=description) | |
parser.add_argument( | |
'file', | |
type=str, | |
help='string which specifies the path to the signal.complex32s file.' | |
) | |
parser.add_argument( | |
'-m', '--min-magnitude', | |
dest='min_magnitude', | |
type=int, | |
default=0, | |
help='integer which specifies the minimum magnitude of the I-part sufficient to let the sample pass.' | |
) | |
parser.add_argument( | |
'-g', '--max-gap', | |
dest='max_gap', | |
type=int, | |
default=0, | |
help='integer which specifies the maximum gap that will be closed.' | |
) | |
parser.add_argument( | |
'-s', '--min-size', | |
dest='min_size', | |
type=int, | |
default=0, | |
help="integer which specifies the minimum size of signal parts after they've been combined." | |
) | |
args = parser.parse_args() | |
# Filter signal file according to given parameters | |
rewrite_file(path=args.file, min_magnitude=args.min_magnitude, max_gap_to_close=args.max_gap, min_size=args.min_size) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Demonstration
Run
python3 filter_signal.py samples.complex32s --min-magnitude 64 --max-gap 80 --min-size 80