Skip to content

Instantly share code, notes, and snippets.

@Semnodime
Created January 30, 2022 11:01
Show Gist options
  • Save Semnodime/4d085ae6b3de5e874b528cb1beb14449 to your computer and use it in GitHub Desktop.
Save Semnodime/4d085ae6b3de5e874b528cb1beb14449 to your computer and use it in GitHub Desktop.
Script to preprocess IQ-signals in-place: Remove noise and combine nearby signal parts into one.
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
description = """Script to preprocess IQ-signals in-place: Remove noise and combine nearby signal parts into one."""
def list_strong_signal_indices(signal, threshold: int):
"""
Return a list of all indices where the magnitude of the I-part is not smaller than the threshold.
:param signal: numpy array.
:param threshold: integer which specifies the minimum magnitude of the I-part sufficient to let the sample pass.
:return: generator of integer indices
"""
ifile_I, ifile_Q = np.hsplit(signal, 2)
return np.where((ifile_I >= threshold) | (ifile_I <= -threshold))[0]
def indices_to_intervals(indices):
"""
Iterate over indices and yield an interval for every sequence of successive indices.
:param indices: iterable integer indices
:return: generator of interval tuples
Example:
[1, 2, 3, 4, 10, 11, 15] -> (x for x in [(1,4), (10, 11), (15, 15)])
"""
iterator = (i for i in indices)
try:
rising_flank = next(iterator)
falling_flank = rising_flank
except StopIteration:
return
for r2 in iterator:
if r2 != falling_flank + 1:
yield rising_flank, falling_flank
rising_flank = r2
falling_flank = r2
yield rising_flank, falling_flank
def connect_nearby_intervals(intervals, threshold: int):
"""
Iterate over intervals and combine them so that no two intervals with a gap smaller than the threshold remains.
:param intervals: iterable intervals.
:param threshold: integer which specifies the maximum gap that will be closed.
:return: generator of combined intervals.
Example:
nearby=4
[(1, 2), (4, 5), (10, 11), (17,95), (100, 120)] -> (x for x in [(1,11), (17, 120)])
"""
iterator = (i for i in intervals)
try:
start, stop = next(iterator)
except StopIteration:
return
for i in iterator:
start2, stop2 = i
if start2 <= stop + threshold + 1:
stop = stop2
else:
yield start, stop
start = start2
stop = stop2
yield start, stop
def remove_small_intervals(intervals, threshold: int):
"""
Yield all intervals but those which are smaller than the threshold.
:param intervals: iterable interval tuples.
:param threshold: integer which specifies the minimum size a passing interval can have.
:return: generator of interval tuples.
"""
for i in intervals:
if i[1] - i[0] + 1 >= threshold:
yield i
def filter_signal_by_intervals(signal, intervals, default_value: int = 0):
"""
Reset the signal to the given default value outside the given intervals.
:param signal: numpy array of the IQ-signal in shape(-1, 2).
:param intervals: iterable integer intervals where the signal untouched
:param default_value: integer value to which the filtered signal will be reset outside the given intervals.
:return: None, as the signal is modified in-place.
"""
start = \
0
for start2, stop2 in intervals:
signal[start:start2][:] = default_value
start = stop2 + 1
signal[start:][:] = default_value
def rewrite_file(path: str, min_magnitude: int, max_gap_to_close: int, min_size: int):
"""
Rewrite .complex32s file on disk based on given parameters.
:param path: string which specifies the path to the .complex32s formatted signal
:param min_magnitude:
:param max_gap_to_close:
:param min_size:
:return:
"""
"""
:param path:
:return:
"""
# Skip processing if the file would remain unchanged
if min_magnitude == 0:
return
signal_IQ = np.memmap(path, mode='r+', dtype=np.int16)
signal_IQ = np.reshape(signal_IQ, (-1, 2))
signal_sites = list_strong_signal_indices(signal_IQ, threshold=min_magnitude)
intervals = indices_to_intervals(signal_sites)
# Skip closing gaps if the signal would remain unchanged
if max_gap_to_close > 0:
intervals = connect_nearby_intervals(intervals, threshold=max_gap_to_close)
# Skip removing intervals if the signal would remain unchanged
if min_size > 1:
intervals = remove_small_intervals(intervals, threshold=min_size)
filter_signal_by_intervals(signal_IQ, intervals=intervals)
if __name__ == '__main__':
import argparse
# Parse command line arguments
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
'file',
type=str,
help='string which specifies the path to the signal.complex32s file.'
)
parser.add_argument(
'-m', '--min-magnitude',
dest='min_magnitude',
type=int,
default=0,
help='integer which specifies the minimum magnitude of the I-part sufficient to let the sample pass.'
)
parser.add_argument(
'-g', '--max-gap',
dest='max_gap',
type=int,
default=0,
help='integer which specifies the maximum gap that will be closed.'
)
parser.add_argument(
'-s', '--min-size',
dest='min_size',
type=int,
default=0,
help="integer which specifies the minimum size of signal parts after they've been combined."
)
args = parser.parse_args()
# Filter signal file according to given parameters
rewrite_file(path=args.file, min_magnitude=args.min_magnitude, max_gap_to_close=args.max_gap, min_size=args.min_size)
@Semnodime
Copy link
Author

Semnodime commented Jan 30, 2022

Demonstration

Run python3 filter_signal.py samples.complex32s --min-magnitude 64 --max-gap 80 --min-size 80

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment