From ea33e4c5cf8d493d41acb5a2f0fe956c323f1ae5 Mon Sep 17 00:00:00 2001 From: Steve Kelly Date: Fri, 10 Sep 2021 20:44:44 -0400 Subject: [PATCH 1/4] SampleBuffer doc string --- src/highlevel.jl | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/highlevel.jl b/src/highlevel.jl index 6a1ee82..79ef96a 100644 --- a/src/highlevel.jl +++ b/src/highlevel.jl @@ -528,7 +528,7 @@ function Stream(channels::AbstractVector{T}; kwargs...) where {T <: Channel} Stream(native_format, channels; kwargs...) end -function _read!(s::Stream{T}, buffers::NTuple{N, Vector{T}}; timeout=nothing) where {N, T} +function _read!(s::Stream{T}, buffers::NTuple{N, Vector{T}}; all=true, timeout=nothing) where {N, T} timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream buflen = length(first(buffers)) @assert all(buffer->length(buffer) == buflen, buffers) @@ -542,6 +542,16 @@ function Base.read!(s, buffers; kwargs...) _read!(s, buffers; kwargs...)[1] end +""" + SampleBuffer{N, T} + +Represents a buffer of samples, of `N` channels producing data of type `T`. + +Fields: + bufs::NTuple{N, Vector{T}} + flags::Cint + timens::typeof(1u"ns") +""" struct SampleBuffer{N, T} bufs::NTuple{N, Vector{T}} flags::Cint @@ -550,7 +560,7 @@ end Base.length(sb::SampleBuffer) = length(sb.bufs[1]) """ -read(s::SoapySDR.Stream, nb::Integer; all=true) + read(s::SoapySDR.Stream, nb::Integer; all=true) Read at most nb bytes from s, returning a `SampleBuffer` From 5b73691afbed53b68f55bc679c9fe2941b58ca7d Mon Sep 17 00:00:00 2001 From: Steve Kelly Date: Fri, 10 Sep 2021 21:32:26 -0400 Subject: [PATCH 2/4] Rework StreamBuffer and make it the core transaction primitive This reworks StreamBuffer, and makes it the core primitive for transacting via read and write. It is now aware of the Device MTU, and will align to the MTU. It also reworks the time stamps to align to each transaction, supporting multiple time stamps per buffer for latency analysis. We also allow the full vector type to be a Parameter, to support other array types. --- examples/rapid_read.jl | 35 ++++++++++ src/highlevel.jl | 141 +++++++++++++++++++++++++++-------------- 2 files changed, 130 insertions(+), 46 deletions(-) create mode 100644 examples/rapid_read.jl diff --git a/examples/rapid_read.jl b/examples/rapid_read.jl new file mode 100644 index 0000000..7aa8a08 --- /dev/null +++ b/examples/rapid_read.jl @@ -0,0 +1,35 @@ +using SoapySDR, SoapyRTLSDR_jll + +# Here we want to test the behavior in a loop, to ensure +# that we can block on buffer overflow conditions, and +# handle partial reads, measure latnecy, etc + +function rapid_read() + dev = Devices()[1] + rx_chan = dev.rx + rx_stream = SoapySDR.Stream(rx_chan) + @show SoapySDR.mtu(rx_stream) + SoapySDR.activate!(rx_stream) + bufs = [SoapySDR.SampleBuffer(rx_stream, 10^6) for i = 1:2] + @show bufs[1].packet_count + @show bufs[2].packet_count + flip = true + while true + # double buffer + flip = !flip + current_buff = bufs[Int(flip)+1] + prev_buff = bufs[Int(!flip)+1] + @assert length(current_buff.bufs[1])%rx_stream.mtu == 0 + + read!(rx_stream, current_buff) + + # sanity checks? + #nequal = 0 + #for i in eachindex(current_buff.bufs) + # nequal += Int(current_buff.bufs[1][i] == prev_buff.bufs[1][i]) + #end + #@show current_buff.timens + #@show nequal, current_buff.timens, delta_t + end + +end \ No newline at end of file diff --git a/src/highlevel.jl b/src/highlevel.jl index 79ef96a..c1d709d 100644 --- a/src/highlevel.jl +++ b/src/highlevel.jl @@ -497,6 +497,9 @@ Base.cconvert(::Type{<:Ptr{SoapySDRStream}}, s::Stream) = s Base.unsafe_convert(::Type{<:Ptr{SoapySDRStream}}, s::Stream) = s.ptr SoapySDRDevice_closeStream(s::Stream) = SoapySDRDevice_closeStream(s.d, s) +streamtype(::Stream{T}) where T = T +mtu(s::Stream) = s.mtu + function Base.show(io::IO, s::Stream) print(io, "Stream on ", s.d.hardware) end @@ -528,60 +531,92 @@ function Stream(channels::AbstractVector{T}; kwargs...) where {T <: Channel} Stream(native_format, channels; kwargs...) end -function _read!(s::Stream{T}, buffers::NTuple{N, Vector{T}}; all=true, timeout=nothing) where {N, T} - timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream - buflen = length(first(buffers)) - @assert all(buffer->length(buffer) == buflen, buffers) - @assert N == s.nchannels - nread, flags, timens = SoapySDRDevice_readStream(s.d, s, Ref(map(pointer, buffers)), buflen, uconvert(u"μs", timeout).val) - timens = timens * u"ns" - nread, flags, timens -end +""" + SampleBuffer(s::SoapySDR.Stream) + SampleBuffer(s::SoapySDR.Stream, n::Int) -function Base.read!(s, buffers; kwargs...) - _read!(s, buffers; kwargs...)[1] -end +Constructs a sample buffer for a given stream. Can contain multiple channels and be of arbitrary length. +To avoid undefined behavior, this requested length with be aligned to the device MTU. It is therefore +important to ensure that subsequent calls and calculations use this length. -""" - SampleBuffer{N, T} +Returns a `SampleBuffer{N,T}` with fields: + bufs::NTuple{N, T} + packet_count::Int + timens::Vector{Pair{Int,typeof(1u"ns")}} -Represents a buffer of samples, of `N` channels producing data of type `T`. +where N is the number of channels and T is the vector type of the buffer (default: Vector). -Fields: - bufs::NTuple{N, Vector{T}} - flags::Cint - timens::typeof(1u"ns") +`bufs` are the buffers for each channel. +`length` length of the buffer. +`packet_count` are the number of transactions of MTU size required by subsequent `read` and `write` operations. +`timens` are the offset and time stamp pairs for each packet. """ struct SampleBuffer{N, T} - bufs::NTuple{N, Vector{T}} - flags::Cint - timens::typeof(1u"ns") + bufs::NTuple{N, T} + length::Int + packet_count::Int + timens::Vector{Pair{Int, typeof(1u"ns")}} +end +Base.length(sb::SampleBuffer) = length(first(sb.bufs)) + +SampleBuffer(s::Stream) = SampleBuffer(s, s.mtu) + +function SampleBuffer(s::Stream{T}, length; round::RoundingMode{RM}=RoundDown, vectortype=Vector) where {T, RM} + + # align to MTU + overrun = length%s.mtu + realigned = false + if length < s.mtu + length = s.mtu + realigned = true + elseif length > s.mtu && overrun != 0 + length = if RM == :Down + length - overrun + elseif RM == :Up + length + s.mtu - overrun + end + realigned = true + end + if realigned + @info "requested 'length' is not aligned to MTU! Aligning to length of $(length) samples" + @info "get MTU with SoapySDR.mtu(::Stream)." + end + + packet_count = length/s.mtu + bufs = ntuple(_->vectortype{T}(undef, length), s.nchannels) + SampleBuffer(bufs, length, Int(packet_count), Vector{Pair{Int, typeof(1u"ns")}}(undef, length)) end -Base.length(sb::SampleBuffer) = length(sb.bufs[1]) -""" - read(s::SoapySDR.Stream, nb::Integer; all=true) -Read at most nb bytes from s, returning a `SampleBuffer` +""" + read!(s::SoapySDR.Stream, buf::SampleBuffer; timeout::Int) -If all is true (the default), this function will block repeatedly trying to read all requested bytes, until an error or -end-of-file occurs. If all is false, at most one read call is performed, and the amount of data returned is device-dependent. -Note that not all stream types support the all option. +Read data from the device into the given buffer. """ -function Base.read(s::Stream{T}, n::Int; all=true, kwargs...) where {T} - bufs = ntuple(_->Vector{T}(undef, n), s.nchannels) - nread, flags, timens = _read!(s, bufs; kwargs...) - # By definition of read, we can allow fewer samples than requested, unless all=false - if nread != n && all - @info "could not read requested length, suggest using read(...;all=false)" - @info("assertion debugging", nread, n) - @assert nread == n +function Base.read!(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout=nothing) where {N, T, VT <: AbstractVector{T}} + timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream + + # check length did not change + for i in eachindex(samplebuffer.bufs) + @assert length(samplebuffer.bufs[i]) == samplebuffer.length end - SampleBuffer(bufs, flags, timens) -end -function Base.read(s::Stream; all=true, kwargs...) - read(s, s.mtu; all=true, kwargs...) + for packet in 1:samplebuffer.packet_count + offset = (packet-1)*s.mtu + @show offset + nread, flags, timens = SoapySDRDevice_readStream(s.d, s, Ref(map(b -> pointer(b, offset), samplebuffer.bufs)), s.mtu, uconvert(u"μs", timeout).val) + timens = timens * u"ns" + + @assert flags & SOAPY_SDR_MORE_FRAGMENTS == 0 + + if nread != s.mtu + @info("assertion debugging", nread, n) + @assert nread == n + end + + samplebuffer.timens[packet] = (offset => timens) + end + samplebuffer end function activate!(s::Stream; flags = 0, timens = nothing, numElems=0) @@ -594,12 +629,26 @@ function deactivate!(s::Stream; flags = 0, timens = nothing) nothing end -function Base.write(s::Stream{T}, buffers::NTuple{N, Vector{T}}; timeout = nothing) where {N, T} +function Base.write(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout = nothing) where {N, T, VT <: AbstractVector{T}} timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream - buflen = length(first(buffers)) - @assert all(buffer->length(buffer) == buflen, buffers) - @assert N == s.nchannels - SoapySDRDevice_writeStream(s.d, s, Ref(map(pointer, buffers)), buflen, 0, 0, uconvert(u"μs", timeout).val) + + # check length did not change + for i in eachindex(samplebuffer.bufs) + @assert length(samplebuffer.bufs[i]) == samplebuffer.length + end + + for packet in 1:samplebuffer.packet_count + offset = (packet-1)*s.mtu + + nelem, flags = SoapySDRDevice_writeStream(s.d, s, Ref(map(b -> pointer(b, offset), samplebuffer.bufs)), s.mtu, 0, 0, uconvert(u"μs", timeout).val) + + if nelem != s.mtu + @info("assertion debugging", nelem, n) + @assert nelem == n + end + + end + samplebuffer end From 228f55a4892761a1e14f47115c0a2af9ef266dcf Mon Sep 17 00:00:00 2001 From: Steve Kelly Date: Mon, 13 Sep 2021 21:06:09 -0400 Subject: [PATCH 3/4] several convienences for treating SampleBuffer more like an array --- src/highlevel.jl | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/highlevel.jl b/src/highlevel.jl index c1d709d..ca27dd2 100644 --- a/src/highlevel.jl +++ b/src/highlevel.jl @@ -558,7 +558,9 @@ struct SampleBuffer{N, T} timens::Vector{Pair{Int, typeof(1u"ns")}} end Base.length(sb::SampleBuffer) = length(first(sb.bufs)) - +Base.getindex(sb::SampleBuffer, i::Int) = sb.bufs[i] +Base.setindex(sb::SampleBuffer, i::Int, v) = (sb.bufs[i] = v) +Base.eachindex(::SampleBuffer{N,T}) where {N, T} = 1:N SampleBuffer(s::Stream) = SampleBuffer(s, s.mtu) function SampleBuffer(s::Stream{T}, length; round::RoundingMode{RM}=RoundDown, vectortype=Vector) where {T, RM} @@ -582,9 +584,9 @@ function SampleBuffer(s::Stream{T}, length; round::RoundingMode{RM}=RoundDown, v @info "get MTU with SoapySDR.mtu(::Stream)." end - packet_count = length/s.mtu + packet_count = Int(length/s.mtu) bufs = ntuple(_->vectortype{T}(undef, length), s.nchannels) - SampleBuffer(bufs, length, Int(packet_count), Vector{Pair{Int, typeof(1u"ns")}}(undef, length)) + SampleBuffer(bufs, length, packet_count, Vector{Pair{Int, typeof(1u"ns")}}(undef, packet_count)) end @@ -593,7 +595,7 @@ end Read data from the device into the given buffer. """ -function Base.read!(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout=nothing) where {N, T, VT <: AbstractVector{T}} +function Base.read!(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout=nothing, activate=true, deactivate=true) where {N, T, VT <: AbstractVector{T}} timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream # check length did not change @@ -601,6 +603,7 @@ function Base.read!(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout=not @assert length(samplebuffer.bufs[i]) == samplebuffer.length end + activate && activate!(s) for packet in 1:samplebuffer.packet_count offset = (packet-1)*s.mtu @show offset @@ -616,6 +619,8 @@ function Base.read!(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout=not samplebuffer.timens[packet] = (offset => timens) end + deactivate && deactivate!(s) + samplebuffer end @@ -629,7 +634,7 @@ function deactivate!(s::Stream; flags = 0, timens = nothing) nothing end -function Base.write(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout = nothing) where {N, T, VT <: AbstractVector{T}} +function Base.write(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout = nothing, activate=true, deactivate=true) where {N, T, VT <: AbstractVector{T}} timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream # check length did not change @@ -637,6 +642,7 @@ function Base.write(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout = n @assert length(samplebuffer.bufs[i]) == samplebuffer.length end + activate && activate!(s) for packet in 1:samplebuffer.packet_count offset = (packet-1)*s.mtu @@ -648,6 +654,8 @@ function Base.write(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout = n end end + deactivate && deactivate!(s) + samplebuffer end From 0d4faca2b693984e942f10945782c9b4cd35b66a Mon Sep 17 00:00:00 2001 From: Steve Kelly Date: Mon, 13 Sep 2021 21:07:15 -0400 Subject: [PATCH 4/4] rework Quick Start docs with new API --- docs/src/index.md | 74 +++++++++++++++++++++++++---------------------- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/docs/src/index.md b/docs/src/index.md index e4de521..88535f3 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -39,40 +39,44 @@ julia> using SoapySDR, SoapyRTLSDR_jll ### Transmitting and Receiving -TX: ``` -# Open first TX-capable channel on first device -channel = Devices()[1].tx[1] - -# Configure channel with appropriate parameters -channel.bandwidth = 800u"kHz" -channel.frequency = 30u"MHz" -channel.gain = 42u"dB" -channel.sample_rate = 2.1u"MHz" - -# Open a (potentially multichannel) stream on this channel -stream = SoapySDR.Stream([channel]) -SoapySDR.activate!(stream) - -# Write out random noise -Base.write(stream, (randn(ComplexF32, 10000),)) -``` - -RX: -``` -# Open first RX-capable channel on first device -channel = Devices()[1].rx[1] - -# Configure channel with appropriate parameters -channel.bandwidth = 800u"kHz" -channel.frequency = 30u"MHz" -channel.gain = 42u"dB" -channel.sample_rate = 2.1u"MHz" - -# Open a (potentially multichannel) stream on this channel -stream = SoapySDR.Stream([channel]) -SoapySDR.activate!(stream) - -# Collect all available samples in the buffer -Base.read(stream) +# Open all TX-capable channels on first device +tx_channels = Devices()[1].tx + +# Open all RX-capable channels on first device +rx_channels = Devices()[1].rx + +# Configure a TX channel with appropriate parameters +# configure the RX channel with similar for e.g. a loopback test +# Be sure to check your local regulations before transmitting! +tx_channel[1].bandwidth = 800u"kHz" +tx_channel[1].frequency = 30u"MHz" +tx_channel[1].gain = 42u"dB" +tx_channel[1].sample_rate = 2.1u"MHz" + +# Open a (potentially multichannel) stream on the channels +tx_stream = SoapySDR.Stream(tx_channels) +rx_stream = SoapySDR.Stream(rx_channels) + +# Setup a sample buffer optimized for the device +# The data can be access with e.g. tx_buf.bufs +# Note: we ask for 10,000 samples, but the API will re-size correctly for the device +tx_buf = SoapySDR.SampleBuffer(tx_stream, 10_000) +rx_buf = SoapySDR.SampleBuffer(rx_stream, 10_000) + +# Setup some data to transmit on each channel +for i in eachindex(tx_buf) + tx_buf[i] = randn(SoapySDR.streamtype(tx_stream), length(tx_buf)) +end + +# Spawn two tasks for full duplex operation +# The tasks will run in parallel and for best resuslts run julia with --threads=auto +read_task = Threads.@spawn read!(rx_stream, rx_buf) +write_task = Threads.@spawn write(tx_stream, tx_buf) + +# Wait for the tasks to complete +wait(read_task) +wait(write_task) + +@show rx_buf[1][1:100] # show the first 100 samples of the first buffer ```