diff --git a/pcmcat.c b/pcmcat.c index e3800f61..7d0b95ee 100644 --- a/pcmcat.c +++ b/pcmcat.c @@ -1,6 +1,7 @@ // $Id: pcmcat.c,v 1.22 2022/12/29 05:55:43 karn Exp $ // Receive and stream PCM RTP data to stdout // Should emit .wav format by default to encode sample rate & parameters for subsequent encoding +// Revised Aug 2023 to more cleanly handle sender restarts #define _GNU_SOURCE 1 #include @@ -21,8 +22,6 @@ #include "multicast.h" struct pcmstream { - struct pcmstream *prev; // Linked list pointers - struct pcmstream *next; uint32_t ssrc; // RTP Sending Source ID int type; // RTP type (10,11,20) @@ -31,27 +30,25 @@ struct pcmstream { char port[NI_MAXSERV]; // RTP Sender source port struct rtp_state rtp_state; + int channels; + }; // Config constants -int const Bufsize = 2048; -float const Samprate = 48000; +int const Bufsize = 9000; // allow for jumbograms // Command line params -char *Mcast_address_text; +char const *Mcast_address_text; int Quiet; -int Stereo; // Force stereo output; otherwise output mono, downmixing if necessary +int Channels = 1; // Output channels const char *App_path; int Verbose; int Input_fd = -1; -struct pcmstream *Pcmstream; -int Sessions; // Session count - limit to 1 for now +struct pcmstream Pcmstream; uint32_t Ssrc; // Requested SSRC -struct pcmstream *lookup_session(const struct sockaddr *sender,const uint32_t ssrc); -struct pcmstream *make_session(struct sockaddr const *sender,uint32_t ssrc,uint16_t seq,uint32_t timestamp); -int close_session(struct pcmstream *sp); +static int init(struct pcmstream *pc,struct rtp_header const *rtp,struct sockaddr const *sender); int main(int argc,char *argv[]){ App_path = argv[0]; @@ -61,7 +58,7 @@ int main(int argc,char *argv[]){ while((c = getopt(argc,argv,"qhs:2")) != EOF){ switch(c){ case '2': // Force stereo - Stereo++; + Channels = 2; break; case 'v': Verbose++; @@ -101,6 +98,7 @@ int main(int argc,char *argv[]){ struct sockaddr sender; socklen_t socksize = sizeof(sender); uint8_t buffer[Bufsize]; + // Gets all packets to multicast destination address, regardless of sender IP, sender port, dest port, ssrc int size = recvfrom(Input_fd,buffer,sizeof(buffer),0,&sender,&socksize); if(size == -1){ if(errno != EINTR){ // Happens routinely @@ -114,6 +112,7 @@ int main(int argc,char *argv[]){ struct rtp_header rtp; uint8_t const *dp = ntoh_rtp(&rtp,buffer); + size -= dp - buffer; if(rtp.pad){ // Remove padding @@ -123,157 +122,108 @@ int main(int argc,char *argv[]){ if(size <= 0) continue; - struct pcmstream *sp = lookup_session(&sender,rtp.ssrc); - if(sp == NULL){ - // Not found - if(Sessions || (Ssrc !=0 && rtp.ssrc != Ssrc)){ - // Only take specified SSRC or first SSRC for now - continue; - } - if((sp = make_session(&sender,rtp.ssrc,rtp.seq,rtp.timestamp)) == NULL){ - fprintf(stderr,"No room for new session!!\n"); - continue; - } - getnameinfo((struct sockaddr *)&sender,sizeof(sender),sp->addr,sizeof(sp->addr), - // sp->port,sizeof(sp->port),NI_NOFQDN|NI_DGRAM|NI_NUMERICHOST); - sp->port,sizeof(sp->port),NI_NOFQDN|NI_DGRAM); + if(rtp.ssrc == 0 || (Ssrc != 0 && rtp.ssrc != Ssrc)) + continue; // Ignore unwanted or invalid SSRCs + if(Pcmstream.ssrc == 0){ + // First packet on stream, initialize + init(&Pcmstream,&rtp,&sender); + if(!Quiet){ - fprintf(stderr,"New session from %u@%s:%s, type %d",sp->ssrc,sp->addr,sp->port,rtp.type); - - switch(rtp.type){ - case PCM_STEREO_PT: - case PCM_STEREO_24_PT: - case PCM_STEREO_16_PT: - case PCM_STEREO_12_PT: - case PCM_STEREO_8_PT: - fprintf(stderr,", pcm stereo"); - if(!Stereo) - fprintf(stderr,", downmixing to mono"); - break; - case PCM_MONO_PT: - case PCM_MONO_24_PT: - case PCM_MONO_16_PT: - case PCM_MONO_12_PT: - case PCM_MONO_8_PT: - fprintf(stderr,", pcm mono"); - if(Stereo) - fprintf(stderr,", expanding to pseudo-stereo"); - break; - } - fprintf(stderr,"\n"); + fprintf(stderr,"New session from %u@%s:%s, type %d, channels %d\n", + Pcmstream.ssrc, + Pcmstream.addr, + Pcmstream.port,rtp.type,Pcmstream.channels); } + } else if(rtp.ssrc != Pcmstream.ssrc) + continue; // unwanted SSRC, ignore - Sessions++; + if(!address_match(&sender,&Pcmstream.sender) || getportnumber(&Pcmstream.sender) != getportnumber(&sender)){ + // Source changed, the sender restarted + init(&Pcmstream,&rtp,&sender); + if(!Quiet){ + fprintf(stderr,"Session restart from %u@%s:%s\n", + Pcmstream.ssrc, + Pcmstream.addr, + Pcmstream.port); + } } - int samples_skipped = rtp_process(&sp->rtp_state,&rtp,0); // get rid of last arg - if(samples_skipped < 0) - continue; // old dupe? What if it's simply out of sequence? + if(rtp.marker) + Pcmstream.rtp_state.timestamp = rtp.timestamp; // Resynch - sp->type = rtp.type; - int samples = 0; + if(Pcmstream.channels != channels_from_pt(rtp.type)){ + if(!Quiet) + fprintf(stderr,"Channel count changed from %d to %d\n",Pcmstream.channels,channels_from_pt(rtp.type)); + Pcmstream.channels = channels_from_pt(rtp.type); + } + if(Pcmstream.channels != 1 && Pcmstream.channels != 2) + continue; // Invalid - int16_t *sdp = (int16_t *)dp; - switch(rtp.type){ - case PCM_STEREO_PT: - case PCM_STEREO_24_PT: - case PCM_STEREO_16_PT: - case PCM_STEREO_12_PT: - case PCM_STEREO_8_PT: - - samples = size / 4; - while(samples-- > 0){ - // Swap sample to host order, cat to stdout - int16_t left = ntohs(*sdp++); - int16_t right = ntohs(*sdp++); - if(Stereo){ - fwrite(&left,sizeof(left),1,stdout); - fwrite(&right,sizeof(right),1,stdout); - } else { - // Downmix to mono - int16_t samp = (left + right) / 2; - fwrite(&samp,sizeof(samp),1,stdout); - } - } - break; - case PCM_MONO_PT: // Mono - case PCM_MONO_24_PT: - case PCM_MONO_16_PT: - case PCM_MONO_12_PT: - case PCM_MONO_8_PT: - samples = size / 2; - while(samples-- > 0){ - // Swap sample to host order, cat to stdout - int16_t d = ntohs(*sdp++); - fwrite(&d,sizeof(d),1,stdout); - if(Stereo) - fwrite(&d,sizeof(d),1,stdout); // Force to pseudo-stereo + int const time_step = (int32_t)(rtp.timestamp - Pcmstream.rtp_state.timestamp); + if(time_step < 0){ + // Old dupe + Pcmstream.rtp_state.dupes++; + continue; + } else if(time_step > 0){ + Pcmstream.rtp_state.drops++; + fprintf(stderr,"Drops %llu\n",Pcmstream.rtp_state.drops); + if(time_step < 48000){ // Arbitrary threshold - clean this up! + int16_t zeroes[time_step]; + memset(zeroes,0,sizeof(zeroes)); + fwrite(zeroes,sizeof(*zeroes),time_step,stdout); + if(Channels == 2) + fwrite(zeroes,sizeof(*zeroes),time_step,stdout); // Write it twice } - break; - default: - samples = 0; - break; // ignore + // Resync + Pcmstream.rtp_state.timestamp = rtp.timestamp; // Bring up to date? + } + Pcmstream.rtp_state.bytes += size; + + int const sampcount = size / sizeof(int16_t); // # of 16-bit samples, regardless of mono or stereo + int const framecount = sampcount / Pcmstream.channels; // == sampcount for mono, sampcount/2 for stereo + int16_t * const sdp = (int16_t *)dp; + + // Byte swap incoming buffer, regardless of channels + for(int i=0; i < sampcount; i++) + sdp[i] = ntohs(sdp[i]); + + if(Channels == Pcmstream.channels) { + fwrite(sdp,sizeof(*sdp),sampcount,stdout); // Both mono or stereo, no expansion/mixing needed + } else if(Channels == 1 && Pcmstream.channels == 2) { + for(int i=0; i < framecount; i++) // Downmix to mono + sdp[i] = (sdp[2*i] + sdp[2*i + 1]) / 2; + + fwrite(sdp,sizeof(*sdp),framecount,stdout); + } else { + // Expand to pseudo-stereo + int16_t output[2*sampcount]; + for(int i=0; i < sampcount; i++) + output[2*i] = output[2*i+1] = sdp[i]; + + fwrite(output,sizeof(*output),sampcount*2,stdout); } fflush(stdout); + Pcmstream.rtp_state.timestamp += framecount; + Pcmstream.rtp_state.seq = rtp.seq + 1; } exit(0); } - - - -struct pcmstream *lookup_session(const struct sockaddr *sender,const uint32_t ssrc){ - struct pcmstream *sp; - for(sp = Pcmstream; sp != NULL; sp = sp->next){ - if(sp->ssrc == ssrc && address_match(&sp->sender,sender)){ - // Found it - if(sp->prev != NULL){ - // Not at top of bucket chain; move it there - if(sp->next != NULL) - sp->next->prev = sp->prev; - - sp->prev->next = sp->next; - sp->prev = NULL; - sp->next = Pcmstream; - Pcmstream = sp; - } - return sp; - } - } - return NULL; -} -// Create a new session, partly initialize -struct pcmstream *make_session(struct sockaddr const *sender,uint32_t ssrc,uint16_t seq,uint32_t timestamp){ - struct pcmstream *sp; - - if((sp = calloc(1,sizeof(*sp))) == NULL) - return NULL; // Shouldn't happen on modern machines! +static int init(struct pcmstream *pc,struct rtp_header const *rtp,struct sockaddr const *sender){ + // First packet on stream, initialize + pc->ssrc = rtp->ssrc; + pc->type = rtp->type; + pc->channels = channels_from_pt(rtp->type); - // Initialize entry - memcpy(&sp->sender,sender,sizeof(struct sockaddr)); - sp->ssrc = ssrc; - - // Put at head of bucket chain - sp->next = Pcmstream; - if(sp->next != NULL) - sp->next->prev = sp; - Pcmstream = sp; - return sp; -} - -int close_session(struct pcmstream *sp){ - if(sp == NULL) - return -1; - - // Remove from linked list - if(sp->next != NULL) - sp->next->prev = sp->prev; - if(sp->prev != NULL) - sp->prev->next = sp->next; - else - Pcmstream = sp->next; - FREE(sp); + memcpy(&pc->sender,sender,sizeof(pc->sender)); // Remember sender + getnameinfo((struct sockaddr *)&pc->sender,sizeof(pc->sender), + pc->addr,sizeof(pc->addr), + pc->port,sizeof(pc->port),NI_NOFQDN|NI_DGRAM); + pc->rtp_state.timestamp = rtp->timestamp; + pc->rtp_state.seq = rtp->seq; + pc->rtp_state.packets = 0; + pc->rtp_state.bytes = 0; + pc->rtp_state.drops = 0; + pc->rtp_state.dupes = 0; return 0; } - -