Skip to content

Commit

Permalink
Merge pull request #22 from JuliaTelecom/sjk/readbuf1
Browse files Browse the repository at this point in the history
Large buffers
  • Loading branch information
sjkelly authored Sep 14, 2021
2 parents 836687a + 0d4faca commit bbce73b
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 74 deletions.
74 changes: 39 additions & 35 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,40 +39,44 @@ julia> using SoapySDR, SoapyRTLSDR_jll

### Transmitting and Receiving

TX:
```
# Open first TX-capable channel on first device
channel = Devices()[1].tx[1]
# Configure channel with appropriate parameters
channel.bandwidth = 800u"kHz"
channel.frequency = 30u"MHz"
channel.gain = 42u"dB"
channel.sample_rate = 2.1u"MHz"
# Open a (potentially multichannel) stream on this channel
stream = SoapySDR.Stream([channel])
SoapySDR.activate!(stream)
# Write out random noise
Base.write(stream, (randn(ComplexF32, 10000),))
```

RX:
```
# Open first RX-capable channel on first device
channel = Devices()[1].rx[1]
# Configure channel with appropriate parameters
channel.bandwidth = 800u"kHz"
channel.frequency = 30u"MHz"
channel.gain = 42u"dB"
channel.sample_rate = 2.1u"MHz"
# Open a (potentially multichannel) stream on this channel
stream = SoapySDR.Stream([channel])
SoapySDR.activate!(stream)
# Collect all available samples in the buffer
Base.read(stream)
# Open all TX-capable channels on first device
tx_channels = Devices()[1].tx
# Open all RX-capable channels on first device
rx_channels = Devices()[1].rx
# Configure a TX channel with appropriate parameters
# configure the RX channel with similar for e.g. a loopback test
# Be sure to check your local regulations before transmitting!
tx_channel[1].bandwidth = 800u"kHz"
tx_channel[1].frequency = 30u"MHz"
tx_channel[1].gain = 42u"dB"
tx_channel[1].sample_rate = 2.1u"MHz"
# Open a (potentially multichannel) stream on the channels
tx_stream = SoapySDR.Stream(tx_channels)
rx_stream = SoapySDR.Stream(rx_channels)
# Setup a sample buffer optimized for the device
# The data can be access with e.g. tx_buf.bufs
# Note: we ask for 10,000 samples, but the API will re-size correctly for the device
tx_buf = SoapySDR.SampleBuffer(tx_stream, 10_000)
rx_buf = SoapySDR.SampleBuffer(rx_stream, 10_000)
# Setup some data to transmit on each channel
for i in eachindex(tx_buf)
tx_buf[i] = randn(SoapySDR.streamtype(tx_stream), length(tx_buf))
end
# Spawn two tasks for full duplex operation
# The tasks will run in parallel and for best resuslts run julia with --threads=auto
read_task = Threads.@spawn read!(rx_stream, rx_buf)
write_task = Threads.@spawn write(tx_stream, tx_buf)
# Wait for the tasks to complete
wait(read_task)
wait(write_task)
@show rx_buf[1][1:100] # show the first 100 samples of the first buffer
```
35 changes: 35 additions & 0 deletions examples/rapid_read.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using SoapySDR, SoapyRTLSDR_jll

# Here we want to test the behavior in a loop, to ensure
# that we can block on buffer overflow conditions, and
# handle partial reads, measure latnecy, etc

function rapid_read()
dev = Devices()[1]
rx_chan = dev.rx
rx_stream = SoapySDR.Stream(rx_chan)
@show SoapySDR.mtu(rx_stream)
SoapySDR.activate!(rx_stream)
bufs = [SoapySDR.SampleBuffer(rx_stream, 10^6) for i = 1:2]
@show bufs[1].packet_count
@show bufs[2].packet_count
flip = true
while true
# double buffer
flip = !flip
current_buff = bufs[Int(flip)+1]
prev_buff = bufs[Int(!flip)+1]
@assert length(current_buff.bufs[1])%rx_stream.mtu == 0

read!(rx_stream, current_buff)

# sanity checks?
#nequal = 0
#for i in eachindex(current_buff.bufs)
# nequal += Int(current_buff.bufs[1][i] == prev_buff.bufs[1][i])
#end
#@show current_buff.timens
#@show nequal, current_buff.timens, delta_t
end

end
145 changes: 106 additions & 39 deletions src/highlevel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ Base.cconvert(::Type{<:Ptr{SoapySDRStream}}, s::Stream) = s
Base.unsafe_convert(::Type{<:Ptr{SoapySDRStream}}, s::Stream) = s.ptr
SoapySDRDevice_closeStream(s::Stream) = SoapySDRDevice_closeStream(s.d, s)

streamtype(::Stream{T}) where T = T
mtu(s::Stream) = s.mtu

function Base.show(io::IO, s::Stream)
print(io, "Stream on ", s.d.hardware)
end
Expand Down Expand Up @@ -528,50 +531,97 @@ function Stream(channels::AbstractVector{T}; kwargs...) where {T <: Channel}
Stream(native_format, channels; kwargs...)
end

function _read!(s::Stream{T}, buffers::NTuple{N, Vector{T}}; timeout=nothing) where {N, T}
timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream
buflen = length(first(buffers))
@assert all(buffer->length(buffer) == buflen, buffers)
@assert N == s.nchannels
nread, flags, timens = SoapySDRDevice_readStream(s.d, s, Ref(map(pointer, buffers)), buflen, uconvert(u"μs", timeout).val)
timens = timens * u"ns"
nread, flags, timens
end
"""
SampleBuffer(s::SoapySDR.Stream)
SampleBuffer(s::SoapySDR.Stream, n::Int)
function Base.read!(s, buffers; kwargs...)
_read!(s, buffers; kwargs...)[1]
end
Constructs a sample buffer for a given stream. Can contain multiple channels and be of arbitrary length.
To avoid undefined behavior, this requested length with be aligned to the device MTU. It is therefore
important to ensure that subsequent calls and calculations use this length.
Returns a `SampleBuffer{N,T}` with fields:
bufs::NTuple{N, T}
packet_count::Int
timens::Vector{Pair{Int,typeof(1u"ns")}}
where N is the number of channels and T is the vector type of the buffer (default: Vector).
`bufs` are the buffers for each channel.
`length` length of the buffer.
`packet_count` are the number of transactions of MTU size required by subsequent `read` and `write` operations.
`timens` are the offset and time stamp pairs for each packet.
"""
struct SampleBuffer{N, T}
bufs::NTuple{N, Vector{T}}
flags::Cint
timens::typeof(1u"ns")
bufs::NTuple{N, T}
length::Int
packet_count::Int
timens::Vector{Pair{Int, typeof(1u"ns")}}
end
Base.length(sb::SampleBuffer) = length(first(sb.bufs))
Base.getindex(sb::SampleBuffer, i::Int) = sb.bufs[i]
Base.setindex(sb::SampleBuffer, i::Int, v) = (sb.bufs[i] = v)
Base.eachindex(::SampleBuffer{N,T}) where {N, T} = 1:N
SampleBuffer(s::Stream) = SampleBuffer(s, s.mtu)

function SampleBuffer(s::Stream{T}, length; round::RoundingMode{RM}=RoundDown, vectortype=Vector) where {T, RM}

# align to MTU
overrun = length%s.mtu
realigned = false
if length < s.mtu
length = s.mtu
realigned = true
elseif length > s.mtu && overrun != 0
length = if RM == :Down
length - overrun
elseif RM == :Up
length + s.mtu - overrun
end
realigned = true
end
if realigned
@info "requested 'length' is not aligned to MTU! Aligning to length of $(length) samples"
@info "get MTU with SoapySDR.mtu(::Stream)."
end

packet_count = Int(length/s.mtu)
bufs = ntuple(_->vectortype{T}(undef, length), s.nchannels)
SampleBuffer(bufs, length, packet_count, Vector{Pair{Int, typeof(1u"ns")}}(undef, packet_count))
end
Base.length(sb::SampleBuffer) = length(sb.bufs[1])

"""
read(s::SoapySDR.Stream, nb::Integer; all=true)

Read at most nb bytes from s, returning a `SampleBuffer`
"""
read!(s::SoapySDR.Stream, buf::SampleBuffer; timeout::Int)
If all is true (the default), this function will block repeatedly trying to read all requested bytes, until an error or
end-of-file occurs. If all is false, at most one read call is performed, and the amount of data returned is device-dependent.
Note that not all stream types support the all option.
Read data from the device into the given buffer.
"""
function Base.read(s::Stream{T}, n::Int; all=true, kwargs...) where {T}
bufs = ntuple(_->Vector{T}(undef, n), s.nchannels)
nread, flags, timens = _read!(s, bufs; kwargs...)
# By definition of read, we can allow fewer samples than requested, unless all=false
if nread != n && all
@info "could not read requested length, suggest using read(...;all=false)"
@info("assertion debugging", nread, n)
@assert nread == n
function Base.read!(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout=nothing, activate=true, deactivate=true) where {N, T, VT <: AbstractVector{T}}
timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream

# check length did not change
for i in eachindex(samplebuffer.bufs)
@assert length(samplebuffer.bufs[i]) == samplebuffer.length
end
SampleBuffer(bufs, flags, timens)
end

function Base.read(s::Stream; all=true, kwargs...)
read(s, s.mtu; all=true, kwargs...)
activate && activate!(s)
for packet in 1:samplebuffer.packet_count
offset = (packet-1)*s.mtu
@show offset
nread, flags, timens = SoapySDRDevice_readStream(s.d, s, Ref(map(b -> pointer(b, offset), samplebuffer.bufs)), s.mtu, uconvert(u"μs", timeout).val)
timens = timens * u"ns"

@assert flags & SOAPY_SDR_MORE_FRAGMENTS == 0

if nread != s.mtu
@info("assertion debugging", nread, n)
@assert nread == n
end

samplebuffer.timens[packet] = (offset => timens)
end
deactivate && deactivate!(s)

samplebuffer
end

function activate!(s::Stream; flags = 0, timens = nothing, numElems=0)
Expand All @@ -584,12 +634,29 @@ function deactivate!(s::Stream; flags = 0, timens = nothing)
nothing
end

function Base.write(s::Stream{T}, buffers::NTuple{N, Vector{T}}; timeout = nothing) where {N, T}
function Base.write(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout = nothing, activate=true, deactivate=true) where {N, T, VT <: AbstractVector{T}}
timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream
buflen = length(first(buffers))
@assert all(buffer->length(buffer) == buflen, buffers)
@assert N == s.nchannels
SoapySDRDevice_writeStream(s.d, s, Ref(map(pointer, buffers)), buflen, 0, 0, uconvert(u"μs", timeout).val)

# check length did not change
for i in eachindex(samplebuffer.bufs)
@assert length(samplebuffer.bufs[i]) == samplebuffer.length
end

activate && activate!(s)
for packet in 1:samplebuffer.packet_count
offset = (packet-1)*s.mtu

nelem, flags = SoapySDRDevice_writeStream(s.d, s, Ref(map(b -> pointer(b, offset), samplebuffer.bufs)), s.mtu, 0, 0, uconvert(u"μs", timeout).val)

if nelem != s.mtu
@info("assertion debugging", nelem, n)
@assert nelem == n
end

end
deactivate && deactivate!(s)

samplebuffer
end


Expand Down

0 comments on commit bbce73b

Please sign in to comment.