Skip to content

Commit

Permalink
rewrite: cleanup, handle sender restarts
Browse files Browse the repository at this point in the history
  • Loading branch information
ka9q committed Aug 8, 2023
1 parent 34f6074 commit 900db70
Showing 1 changed file with 99 additions and 149 deletions.
248 changes: 99 additions & 149 deletions pcmcat.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// $Id: pcmcat.c,v 1.22 2022/12/29 05:55:43 karn Exp $
// Receive and stream PCM RTP data to stdout
// Should emit .wav format by default to encode sample rate & parameters for subsequent encoding
// Revised Aug 2023 to more cleanly handle sender restarts

#define _GNU_SOURCE 1
#include <assert.h>
Expand All @@ -21,8 +22,6 @@
#include "multicast.h"

struct pcmstream {
struct pcmstream *prev; // Linked list pointers
struct pcmstream *next;
uint32_t ssrc; // RTP Sending Source ID
int type; // RTP type (10,11,20)

Expand All @@ -31,27 +30,25 @@ struct pcmstream {
char port[NI_MAXSERV]; // RTP Sender source port

struct rtp_state rtp_state;
int channels;

};

// Config constants
int const Bufsize = 2048;
float const Samprate = 48000;
int const Bufsize = 9000; // allow for jumbograms

// Command line params
char *Mcast_address_text;
char const *Mcast_address_text;
int Quiet;
int Stereo; // Force stereo output; otherwise output mono, downmixing if necessary
int Channels = 1; // Output channels
const char *App_path;
int Verbose;

int Input_fd = -1;
struct pcmstream *Pcmstream;
int Sessions; // Session count - limit to 1 for now
struct pcmstream Pcmstream;
uint32_t Ssrc; // Requested SSRC

struct pcmstream *lookup_session(const struct sockaddr *sender,const uint32_t ssrc);
struct pcmstream *make_session(struct sockaddr const *sender,uint32_t ssrc,uint16_t seq,uint32_t timestamp);
int close_session(struct pcmstream *sp);
static int init(struct pcmstream *pc,struct rtp_header const *rtp,struct sockaddr const *sender);

int main(int argc,char *argv[]){
App_path = argv[0];
Expand All @@ -61,7 +58,7 @@ int main(int argc,char *argv[]){
while((c = getopt(argc,argv,"qhs:2")) != EOF){
switch(c){
case '2': // Force stereo
Stereo++;
Channels = 2;
break;
case 'v':
Verbose++;
Expand Down Expand Up @@ -101,6 +98,7 @@ int main(int argc,char *argv[]){
struct sockaddr sender;
socklen_t socksize = sizeof(sender);
uint8_t buffer[Bufsize];
// Gets all packets to multicast destination address, regardless of sender IP, sender port, dest port, ssrc
int size = recvfrom(Input_fd,buffer,sizeof(buffer),0,&sender,&socksize);
if(size == -1){
if(errno != EINTR){ // Happens routinely
Expand All @@ -114,6 +112,7 @@ int main(int argc,char *argv[]){

struct rtp_header rtp;
uint8_t const *dp = ntoh_rtp(&rtp,buffer);

size -= dp - buffer;
if(rtp.pad){
// Remove padding
Expand All @@ -123,157 +122,108 @@ int main(int argc,char *argv[]){
if(size <= 0)
continue;

struct pcmstream *sp = lookup_session(&sender,rtp.ssrc);
if(sp == NULL){
// Not found
if(Sessions || (Ssrc !=0 && rtp.ssrc != Ssrc)){
// Only take specified SSRC or first SSRC for now
continue;
}
if((sp = make_session(&sender,rtp.ssrc,rtp.seq,rtp.timestamp)) == NULL){
fprintf(stderr,"No room for new session!!\n");
continue;
}
getnameinfo((struct sockaddr *)&sender,sizeof(sender),sp->addr,sizeof(sp->addr),
// sp->port,sizeof(sp->port),NI_NOFQDN|NI_DGRAM|NI_NUMERICHOST);
sp->port,sizeof(sp->port),NI_NOFQDN|NI_DGRAM);
if(rtp.ssrc == 0 || (Ssrc != 0 && rtp.ssrc != Ssrc))
continue; // Ignore unwanted or invalid SSRCs

if(Pcmstream.ssrc == 0){
// First packet on stream, initialize
init(&Pcmstream,&rtp,&sender);

if(!Quiet){
fprintf(stderr,"New session from %u@%s:%s, type %d",sp->ssrc,sp->addr,sp->port,rtp.type);

switch(rtp.type){
case PCM_STEREO_PT:
case PCM_STEREO_24_PT:
case PCM_STEREO_16_PT:
case PCM_STEREO_12_PT:
case PCM_STEREO_8_PT:
fprintf(stderr,", pcm stereo");
if(!Stereo)
fprintf(stderr,", downmixing to mono");
break;
case PCM_MONO_PT:
case PCM_MONO_24_PT:
case PCM_MONO_16_PT:
case PCM_MONO_12_PT:
case PCM_MONO_8_PT:
fprintf(stderr,", pcm mono");
if(Stereo)
fprintf(stderr,", expanding to pseudo-stereo");
break;
}
fprintf(stderr,"\n");
fprintf(stderr,"New session from %u@%s:%s, type %d, channels %d\n",
Pcmstream.ssrc,
Pcmstream.addr,
Pcmstream.port,rtp.type,Pcmstream.channels);
}
} else if(rtp.ssrc != Pcmstream.ssrc)
continue; // unwanted SSRC, ignore

Sessions++;
if(!address_match(&sender,&Pcmstream.sender) || getportnumber(&Pcmstream.sender) != getportnumber(&sender)){
// Source changed, the sender restarted
init(&Pcmstream,&rtp,&sender);
if(!Quiet){
fprintf(stderr,"Session restart from %u@%s:%s\n",
Pcmstream.ssrc,
Pcmstream.addr,
Pcmstream.port);
}
}
int samples_skipped = rtp_process(&sp->rtp_state,&rtp,0); // get rid of last arg
if(samples_skipped < 0)
continue; // old dupe? What if it's simply out of sequence?
if(rtp.marker)
Pcmstream.rtp_state.timestamp = rtp.timestamp; // Resynch

sp->type = rtp.type;
int samples = 0;
if(Pcmstream.channels != channels_from_pt(rtp.type)){
if(!Quiet)
fprintf(stderr,"Channel count changed from %d to %d\n",Pcmstream.channels,channels_from_pt(rtp.type));
Pcmstream.channels = channels_from_pt(rtp.type);
}
if(Pcmstream.channels != 1 && Pcmstream.channels != 2)
continue; // Invalid

int16_t *sdp = (int16_t *)dp;
switch(rtp.type){
case PCM_STEREO_PT:
case PCM_STEREO_24_PT:
case PCM_STEREO_16_PT:
case PCM_STEREO_12_PT:
case PCM_STEREO_8_PT:

samples = size / 4;
while(samples-- > 0){
// Swap sample to host order, cat to stdout
int16_t left = ntohs(*sdp++);
int16_t right = ntohs(*sdp++);
if(Stereo){
fwrite(&left,sizeof(left),1,stdout);
fwrite(&right,sizeof(right),1,stdout);
} else {
// Downmix to mono
int16_t samp = (left + right) / 2;
fwrite(&samp,sizeof(samp),1,stdout);
}
}
break;
case PCM_MONO_PT: // Mono
case PCM_MONO_24_PT:
case PCM_MONO_16_PT:
case PCM_MONO_12_PT:
case PCM_MONO_8_PT:
samples = size / 2;
while(samples-- > 0){
// Swap sample to host order, cat to stdout
int16_t d = ntohs(*sdp++);
fwrite(&d,sizeof(d),1,stdout);
if(Stereo)
fwrite(&d,sizeof(d),1,stdout); // Force to pseudo-stereo
int const time_step = (int32_t)(rtp.timestamp - Pcmstream.rtp_state.timestamp);
if(time_step < 0){
// Old dupe
Pcmstream.rtp_state.dupes++;
continue;
} else if(time_step > 0){
Pcmstream.rtp_state.drops++;
fprintf(stderr,"Drops %llu\n",Pcmstream.rtp_state.drops);
if(time_step < 48000){ // Arbitrary threshold - clean this up!
int16_t zeroes[time_step];
memset(zeroes,0,sizeof(zeroes));
fwrite(zeroes,sizeof(*zeroes),time_step,stdout);
if(Channels == 2)
fwrite(zeroes,sizeof(*zeroes),time_step,stdout); // Write it twice
}
break;
default:
samples = 0;
break; // ignore
// Resync
Pcmstream.rtp_state.timestamp = rtp.timestamp; // Bring up to date?
}
Pcmstream.rtp_state.bytes += size;

int const sampcount = size / sizeof(int16_t); // # of 16-bit samples, regardless of mono or stereo
int const framecount = sampcount / Pcmstream.channels; // == sampcount for mono, sampcount/2 for stereo
int16_t * const sdp = (int16_t *)dp;

// Byte swap incoming buffer, regardless of channels
for(int i=0; i < sampcount; i++)
sdp[i] = ntohs(sdp[i]);

if(Channels == Pcmstream.channels) {
fwrite(sdp,sizeof(*sdp),sampcount,stdout); // Both mono or stereo, no expansion/mixing needed
} else if(Channels == 1 && Pcmstream.channels == 2) {
for(int i=0; i < framecount; i++) // Downmix to mono
sdp[i] = (sdp[2*i] + sdp[2*i + 1]) / 2;

fwrite(sdp,sizeof(*sdp),framecount,stdout);
} else {
// Expand to pseudo-stereo
int16_t output[2*sampcount];
for(int i=0; i < sampcount; i++)
output[2*i] = output[2*i+1] = sdp[i];

fwrite(output,sizeof(*output),sampcount*2,stdout);
}
fflush(stdout);
Pcmstream.rtp_state.timestamp += framecount;
Pcmstream.rtp_state.seq = rtp.seq + 1;
}
exit(0);
}



struct pcmstream *lookup_session(const struct sockaddr *sender,const uint32_t ssrc){
struct pcmstream *sp;
for(sp = Pcmstream; sp != NULL; sp = sp->next){
if(sp->ssrc == ssrc && address_match(&sp->sender,sender)){
// Found it
if(sp->prev != NULL){
// Not at top of bucket chain; move it there
if(sp->next != NULL)
sp->next->prev = sp->prev;

sp->prev->next = sp->next;
sp->prev = NULL;
sp->next = Pcmstream;
Pcmstream = sp;
}
return sp;
}
}
return NULL;
}
// Create a new session, partly initialize
struct pcmstream *make_session(struct sockaddr const *sender,uint32_t ssrc,uint16_t seq,uint32_t timestamp){
struct pcmstream *sp;

if((sp = calloc(1,sizeof(*sp))) == NULL)
return NULL; // Shouldn't happen on modern machines!
static int init(struct pcmstream *pc,struct rtp_header const *rtp,struct sockaddr const *sender){
// First packet on stream, initialize
pc->ssrc = rtp->ssrc;
pc->type = rtp->type;
pc->channels = channels_from_pt(rtp->type);

// Initialize entry
memcpy(&sp->sender,sender,sizeof(struct sockaddr));
sp->ssrc = ssrc;

// Put at head of bucket chain
sp->next = Pcmstream;
if(sp->next != NULL)
sp->next->prev = sp;
Pcmstream = sp;
return sp;
}

int close_session(struct pcmstream *sp){
if(sp == NULL)
return -1;

// Remove from linked list
if(sp->next != NULL)
sp->next->prev = sp->prev;
if(sp->prev != NULL)
sp->prev->next = sp->next;
else
Pcmstream = sp->next;
FREE(sp);
memcpy(&pc->sender,sender,sizeof(pc->sender)); // Remember sender
getnameinfo((struct sockaddr *)&pc->sender,sizeof(pc->sender),
pc->addr,sizeof(pc->addr),
pc->port,sizeof(pc->port),NI_NOFQDN|NI_DGRAM);
pc->rtp_state.timestamp = rtp->timestamp;
pc->rtp_state.seq = rtp->seq;
pc->rtp_state.packets = 0;
pc->rtp_state.bytes = 0;
pc->rtp_state.drops = 0;
pc->rtp_state.dupes = 0;
return 0;
}



0 comments on commit 900db70

Please sign in to comment.