Real-Time Peak Finding Algorithm Based on ReactiveX

For some reason, I have a task that should accept serial analog signals and find out all positive and negative peaks in real time. The good news is that the algorithm itself does not need to be designed as a hardware circuit. So, let’s try the real power of ReactiveX in processing data streams.

Pull Input Analog Signals

All inputs are generated by a data acquisition system (NI DAQ), the method provided by NI SDK is:

1
2
3
4
5
public IAsyncResult BeginMemoryOptimizedReadWaveform(
int samplesPerChannel,
AsyncCallback callback,
object state,
AnalogWaveform<double>[] data)

The first point is this SDK is designed for .NET 4.5, but it still uses Asynchronous Programming Model (APM), so it must be converted to Task-based Asynchronous Pattern (TAP). Next, the following strange point is it doesn’t follow C# parameter rules of method. For the static factory of Task, it requires the last parameter of the given async method shoule be state: object, but the parameters order of this one is incorrect. After swapped the order, I got:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static class AnalogMultiChannelReaderEx
{
public static Task<AnalogWaveform<double>[]> ReadAsync(
this AnalogMultiChannelReader reader,
int samplesPerChannel,
object state,
AnalogWaveform<double>[] data)
{
return Task<AnalogWaveform<double>[]>.Factory.FromAsync(
reader.BeginMemoryOptimizedReadWaveformApm,
reader.EndMemoryOptimizedReadWaveform,
samplesPerChannel,
data,
state);
}

private static IAsyncResult BeginMemoryOptimizedReadWaveformApm(
this AnalogMultiChannelReader reader,
int samplesPerChannel,
AnalogWaveform<double>[] data,
AsyncCallback callback,
object state)
{
return reader.BeginMemoryOptimizedReadWaveform(samplesPerChannel, callback, state, data);
}
}

Then the original data can be wrapped into an Observable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
var runningTask = new NI_Task();
var data = new[]
{
new AnalogWaveform<double>(Context.SamplesPerChannel),
new AnalogWaveform<double>(Context.SamplesPerChannel)
};

new[]
{
(Context.PhysicalCh1, "ch1"),
(Context.PhysicalCh2, "ch2")
}
.ForEach(x =>
runningTask.AIChannels.CreateVoltageChannel(x.Item1, x.Item2,
(AITerminalConfiguration) (-1),
-Context.DaqThrVoltage,
Context.DaqThrVoltage,
AIVoltageUnits.Volts));

runningTask.Timing.ConfigureSampleClock("",
Context.Rate,
SampleClockActiveEdge.Rising,
SampleQuantityMode.ContinuousSamples,
Context.SamplesPerChannel);

runningTask.Control(TaskAction.Verify);

var reader = new AnalogMultiChannelReader(runningTask.Stream) {SynchronizeCallbacks = true};

Observable.FromAsync(() => reader.ReadAsync(Context.SamplesPerChannel, runningTask, data))
.Repeat()
.ObserveOn(DaqScheduler)
.Do(x => ProcessData(x))
.Subscribe()
.DisposeWith(_d);

In ProcessData, the original data is spilted into a few double variables and passed to Subject.

1
2
3
4
5
6
7
var chnValue = 
data[n].Samples
.Select(x => x.Value)
.OrderBy(x => x)
.First();

Subject<Sample<double, double>>.OnNext(new Sample<double, double>(index, chnValue));

Algorithm Disscussion

In offline situation, we can observe the complete curve easily.

For example, peaks have two characteristics: height and width and the condition for treating a part of the curve as a peak is that there is a bottom following a crest. Therefore, to make this decision, the future data after the focused crest must be known.

Of course, raw data may contains noises and unexpcected peaks, we can use a threshould to filter directly.

However the task is real-time detection, I cannot predict the future data and use it. Thence I try to use the difference (i.e. trend) of curve to find out peaks. After getting difference, according to the characteristics of the first derivative, the original function at this point meets a vertex when the sign of the derivative changes.

The following is the visualized progress (normalized).

Implement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public record SlopeSignModel(double Index, int SlopeSign, int AreaSign)
{
public static readonly IEqualityComparer<SlopeSignModel> Comparer
= new SlopeSignEqualityComparer();

class SlopeSignEqualityComparer : IEqualityComparer<SlopeSignModel>
{
public bool Equals(SlopeSignModel x, SlopeSignModel y)
{
return (x.SlopeSign == y.SlopeSign)
// && (x.AreaSign == y.AreaSign)
;
}

public int GetHashCode(SlopeSignModel obj)
{
return obj.GetHashCode();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
var currentAreaSign = 0;
var slopeSignal = new Subject<SlopeSignModel>();

Subject<Sample<double, double>>.ObserveOn(RxApp.TaskpoolScheduler)
// Downsample
.Buffer(Context.ProcessorDownsampleCount, Context.ProcessorDownsampleCount)
.Select(x => (x[0].Index, x.Average(y => y.Value)))
// Threshold Filter
.Select(x =>
{
var abs = Math.Abs(x.Item2);
return abs > Context.ThresholdVoltage ? x : (x.Index, 0);
})
.Do(x =>
{
var sig = Math.Sign(x.Item2);
currentAreaSign = sig == 1 ? 1 : -1;
})
// Calc Slope
.Buffer(2, 1)
.Select(x => (x[0].Index, (x[1].Item2 - x[0].Item2) / (x[1].Index - x[0].Index), Math.Sign(x[0].Item2)))
.Select(x => new SlopeSignModel(x.Index, Math.Sign(x.Item2), x.Item3))
.Where(x => x.AreaSign != currentAreaSign)
.Subscribe(slopeSignal)
.DisposeWith(_d);

Observable.Create((IObserver<int> o) =>
{
var lastSign = 0;

return ch1SlopeSignal
.ObserveOn(RxApp.TaskpoolScheduler)
.Subscribe(x =>
{
if (lastSign != x.SlopeSign)
{
if (lastSign == 0)
{
lastSign = x.SlopeSign;
}
else
{
lastSign = x.SlopeSign;

o.OnNext(lastSign);
lastSign = 0;
}
}
});
})
.Subscribe()
.DisposeWith(_d);