Loud Noise with externalMedia c(slin16) and Node.js Processing

I’m encountering a tricky issue while integrating my Node.js application with Asterisk via ARI. My goal is to process audio in real-time, specifically for noise suppression using RNNoise.

My Setup:

  • Asterisk Version: 20.12.0
  • Node.js Script: A custom application registered as an ARI application (ari-rnnoise), which utilizes a UDP server to receive and process audio from Asterisk.
  • Operating System: Debian 10

Problem Description:

When I use the Stasis(ari-rnnoise,c(slin16)) command in my Asterisk dialplan to pass a channel to my Stasis application, explicitly instructing Asterisk to deliver audio in slin16 format (16 kHz, 16-bit Signed Linear PCM), the audio stream received by my Node.js script (specifically, its embedded UDP server) contains very loud noise instead of clear caller audio. This happens even when I send the received audio back to Asterisk unchanged or write it to a file and play it back.

Details:

  1. Dialplan Snippet:
    exten => _X.,1,NoOp(Starting Stasis app with SLIN16 format)
    same => n,Answer()
    same => n,Stasis(ari-rnnoise,c(slin16)) ; <-- This is the key command
    same => n,Hangup()
  2. Node.js Script Configuration (ari-rnnoise-processor.js snippet):
    My script’s UDP server listens on 0.0.0.0:8089.

The internal constants are set up to expect slin16 (16kHz, 16-bit, mono) audio:

const SLIN16_FRAME_SIZE = 16000 * 0.020 * 2;
const EXPECTED_SAMPLE_RATE = 16000;
const EXPECTED_BIT_DEPTH = 16;
const EXPECTED_CHANNELS = 1;
const RTP_HEADER_SIZE = 12;

  1. Observed Behavior (ARI Application and UDP Server):
  • An incoming call is passed to my application via Stasis(ari-rnnoise,c(slin16)).
  • My Node.js application receives UDP packets from Asterisk.
  • However, the msg.length of the received UDP packets is consistently 172 bytes (160 bytes of raw audio payload + 12 bytes RTP header).
  • Consequently, the packet size check in my script (if (msg.length !== (SLIN16_FRAME_SIZE + RTP_HEADER_SIZE))) always fails, as 172 bytes is not equal to 652 bytes.
  • When I write the audioChunk (after stripping the 12-byte RTP header, resulting in 160 bytes per frame) to a temporary file and play it back, I hear very loud, unintelligible noise. This strongly indicates that 8 kHz ulaw data (which is 160 bytes per 20ms frame) is being interpreted as 16 kHz slin16 data (which should be 640 bytes per 20ms frame).

My Assumption:

Despite using the c(slin16) argument in Stasis() to explicitly request slin16 audio for the application, Asterisk appears not to convert the channel’s audio to this format. Instead, it seems to continue sending the original channel format (presumably ulaw or alaw at 8 kHz) to my Stasis application.

Questions:

  1. Is it a known behavior that the c(format) argument in Stasis() does not affect the actual audio format delivered to the Stasis application as expected, particularly for slin16?
  2. Are there any specific configurations in rtp.conf, codecs.conf, or other Asterisk files that might influence this behavior or ensure that Asterisk truly converts the audio to slin16 before streaming it to the Stasis application?
  3. How can I guarantee that my Node.js application genuinely receives 16 kHz, 16-bit slin audio from Asterisk? Or do I need to generally assume I’ll receive 8 kHz ulaw/alaw and design my script to internally convert this data to 16 kHz?

Thank you for any help or insights you can provide!

ari-rnnoise-processor.js:

// Importiere notwendige Module
const client = require('ari-client');
const dgram = require('dgram');
const chalk = require('chalk');
const util = require('util');
const fs = require('fs');
const path = require('path'); // NEU: path importieren

// Importiere unsere neue RnnoiseProvider-Klasse
const { RnnoiseProvider } = require('./rnnoise-provider');

// Definiere die RTP-Paketgröße für s-lin16 (20ms Frame bei 16kHz, 16bit Samples)
const SLIN16_FRAME_SIZE = 16000 * 0.020 * 2; // 16000 Hz * 0.020 Sek * 2 Bytes/Sample = 640 Bytes
const EXPECTED_SAMPLE_RATE = 16000;
const EXPECTED_BIT_DEPTH = 16;
const EXPECTED_CHANNELS = 1;
const RTP_HEADER_SIZE = 12; // Standard RTP Header ist 12 Bytes

// Temporäres Verzeichnis (aus RnnoiseProvider hierher verschoben, falls von beiden genutzt)
const TEMP_AUDIO_DIR = '/etc/asterisk/tmp';

class AriRnnoiseProcessor {
    constructor(config) {
        this.config = config;
        this.client = null;
        this.server = null;
        this.activeCalls = new Map(); // Map: channelId -> { initiatingChannel, externalMediaChannel, bridge, rnnoiseProvider }

        // NEU: Map zur Verknüpfung von Remote-Adresse mit dem callContext
        this.remoteAddressToCallContextMap = new Map(); // Map: remoteAddress -> callContext

        console.log(chalk.cyan("AriRnnoiseProcessor: Initializing with config:"), util.inspect(config));

        // Sicherstellen, dass das Temp-Verzeichnis existiert
        console.log(chalk.green(`AriRnnoiseProcessor: Ensuring temp directory exists: ${TEMP_AUDIO_DIR}`));
        try {
            fs.mkdirSync(TEMP_AUDIO_DIR, { recursive: true });
        } catch (err) {
            console.error(chalk.red(`AriRnnoiseProcessor: Failed to create temp directory ${TEMP_AUDIO_DIR}: ${err.message}`));
            process.exit(1); // Kritischer Fehler
        }

        this.startAudioServer();
        this.connectToAri();
    }

    async connectToAri() {
        const { ariServerUrl, ariUser, ariPassword } = this.config;
        try {
            this.client = await client.connect(ariServerUrl, ariUser, ariPassword);
            console.log(chalk.green(`AriRnnoiseProcessor: Connected to Asterisk ARI at ${ariServerUrl}`));
            this.setupAriListeners();
            this.client.start('ari-rnnoise');
        } catch (err) {
            console.error(chalk.red(`AriRnnoiseProcessor: Error connecting to ARI: ${err.message}`));
            process.exit(1);
        }
    }

    setupAriListeners() {
        this.client.on('StasisStart', async (event, channel) => {
            console.log(chalk.blue(`StasisStart: Channel ${channel.id} entered Stasis application '${event.application}'.`));
            console.log(chalk.magenta(`DEBUG: event.args for Channel ${channel.id}: ${util.inspect(event.args)}`));

            console.log(chalk.bgRed.white(`DEBUG: Current activeCalls map state before processing StasisStart for ${channel.id}:`));
            if (this.activeCalls.size === 0) {
                console.log(chalk.bgRed.white(`  Map is empty.`));
            } else {
                for (const [key, value] of this.activeCalls.entries()) {
                    console.log(chalk.bgRed.white(`  Map Entry - Key (Initiating Channel ID): ${key}`));
                    if (value.initiatingChannel) {
                        console.log(chalk.bgRed.white(`    initiatingChannel ID: ${value.initiatingChannel.id}`));
                    }
                    if (value.externalMediaChannel) {
                        console.log(chalk.bgRed.white(`    externalMediaChannel ID: ${value.externalMediaChannel.id}`));
                    }
                    if (value.bridge) {
                        console.log(chalk.bgRed.white(`    Bridge ID: ${value.bridge.id}`));
                    }
                }
            }
            console.log(chalk.bgRed.white(`------------------------------------------------------------------`));

            let foundCallContextForExternalMedia = null;
            // Durchsuche activeCalls nach einem CallContext, der diesen Channel als externalMediaChannel hat
            for (const [initiatingChannelId, callContext] of this.activeCalls.entries()) {
                if (callContext.externalMediaChannel && callContext.externalMediaChannel.id === channel.id) {
                    foundCallContextForExternalMedia = callContext;
                    break;
                }
            }

            if (foundCallContextForExternalMedia) {
                console.log(chalk.green(`StasisStart: CORRECTLY DETECTED External Media Channel ${channel.id} (linked to initiating channel ${foundCallContextForExternalMedia.initiatingChannel.id}). Adding to bridge.`));
                try {
                    await foundCallContextForExternalMedia.bridge.addChannel({ channel: channel.id });
                    console.log(chalk.blue(`External Media Channel ${channel.id} added to bridge ${foundCallContextForExternalMedia.bridge.id}.`));

                } catch (err) {
                    console.error(chalk.red(`Error adding external media channel ${channel.id} to bridge ${foundCallContextForExternalMedia.bridge.id}: ${err.message}`));
                    await channel.hangup();
                    this.cleanupCall(foundCallContextForExternalMedia);
                }
            } else {
                // Dies ist ein neuer Initiating Channel
                if (this.activeCalls.has(channel.id)) {
                    const existingCallContext = this.activeCalls.get(channel.id);
                    console.warn(chalk.yellow(`StasisStart: Initiating Channel ${channel.id} already in activeCalls. This might indicate a dialplan loop or race condition. Hanging up.`));
                    await channel.hangup();
                    this.cleanupCall(existingCallContext);
                    return;
                }

                console.log(chalk.blue(`StasisStart: New initiating channel ${channel.id} detected. Creating bridge and external media.`));
                const newCallContext = { initiatingChannel: channel };
                this.activeCalls.set(channel.id, newCallContext);

                try {
                    await channel.answer();
                    await this.createBridgeAndExternalMedia(channel);
                } catch (err) {
                    console.error(chalk.red(`Error processing new channel ${channel.id}: ${err.message}`));
                    await channel.hangup();
                    this.activeCalls.delete(channel.id);
                }
            }
        });

        this.client.on('StasisEnd', (event, channel) => {
            console.log(chalk.blue(`StasisEnd: Channel ${channel.id} left Stasis application.`));
            let callContextToClean = null;
            // Finden Sie den passenden CallContext basierend auf Initiator- oder External Media Channel
            for (const [initiatingChannelId, callContext] of this.activeCalls.entries()) {
                if (callContext.initiatingChannel && callContext.initiatingChannel.id === channel.id) {
                    callContextToClean = callContext;
                    console.log(chalk.yellow(`Initiating channel ${channel.id} ended. Preparing cleanup.`));
                    break;
                } else if (callContext.externalMediaChannel && callContext.externalMediaChannel.id === channel.id) {
                    callContextToClean = callContext;
                    console.log(chalk.yellow(`External Media channel ${channel.id} ended. Preparing cleanup.`));
                    break;
                }
            }
            if (callContextToClean) {
                if (this.activeCalls.has(callContextToClean.initiatingChannel.id)) {
                    console.log(chalk.yellow(`Triggering cleanup for call context ${callContextToClean.initiatingChannel.id}.`));
                    this.cleanupCall(callContextToClean);
                } else {
                    console.log(chalk.gray(`StasisEnd: Call context for channel ${channel.id} already being cleaned up or not found.`));
                }
            } else {
                console.log(chalk.gray(`StasisEnd: Channel ${channel.id} not found in active calls.`));
            }
        });

        this.client.on('ChannelHangupRequest', (event, channel) => {
            console.log(chalk.blue(`ChannelHangupRequest: Channel ${channel.id} requested hangup.`));
        });

        this.client.on('ChannelDestroyed', (event, channel) => {
            console.log(chalk.blue(`ChannelDestroyed: Channel ${channel.id} was destroyed.`));
        });

        this.client.applications.subscribe({
            applicationName: 'ari-rnnoise',
            eventSource: 'channel:,bridge:'
        });
    }

    // --- startAudioServer METHODE (UDP) ---
    startAudioServer() {
        const [host, portStr] = this.config.listenServer.split(':');
        const port = parseInt(portStr);

        this.server = dgram.createSocket('udp4');

        // Map zum Speichern von RnnoiseProvider-Instanzen pro Remote-Adresse (Asterisk-Socket)
        // Zusätzlich wird jetzt auch der rawAsteriskInputStream hier verwaltet
        this.audioProviders = new Map(); // Map: remoteAddress (string) -> { rnnoiseProvider, rawAsteriskInputStream, lastActivityTimestamp, callContext }

        this.server.on('message', async (msg, rinfo) => {
            const remoteAddress = `${rinfo.address}:${rinfo.port}`;
            let providerContext = this.audioProviders.get(remoteAddress);

            if (!providerContext) {
                console.log(chalk.green(`UDP Server: NEW UDP stream detected from ${remoteAddress}. Initializing RnnoiseProvider.`));
                // NEU: Logge das erwartete Format und die Frame-Größe
                console.log(chalk.yellow(`UDP Server: Expected incoming audio format: ${EXPECTED_SAMPLE_RATE}Hz, ${EXPECTED_BIT_DEPTH}-bit, ${EXPECTED_CHANNELS} channel (slin16) with frame size: ${SLIN16_FRAME_SIZE} bytes, plus ${RTP_HEADER_SIZE} bytes for RTP header.`));

                let associatedCallContext = null;
                for (const [initiatingChannelId, cc] of this.activeCalls.entries()) {
                    if (cc.externalMediaChannel && !cc.rnnoiseProvider && !this.remoteAddressToCallContextMap.has(remoteAddress)) {
                        associatedCallContext = cc;
                        break;
                    }
                }

                if (!associatedCallContext) {
                    console.warn(chalk.yellow(`UDP Server: No unique associated call context found for ${remoteAddress}. Dropping UDP stream data for now.`));
                    if (this.audioProviders.has(remoteAddress)) {
                        this.audioProviders.delete(remoteAddress);
                    }
                    return;
                }

                const rnnoiseProvider = new RnnoiseProvider(this.config);
                associatedCallContext.rnnoiseProvider = rnnoiseProvider; // Verknüpfe RnnoiseProvider mit dem CallContext
                this.remoteAddressToCallContextMap.set(remoteAddress, associatedCallContext); // NEU: Map remoteAddress zu CallContext

                // NEU: Stream für die rohen Asterisk-Daten (mit RTP-Header)
                const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
                const rawAsteriskInputFileName = path.join(TEMP_AUDIO_DIR, `raw-asterisk-input-${timestamp}-${remoteAddress.replace(/[:.]/g, '_')}.slin16`);
                let rawAsteriskInputStream = null;

                try {
                    rawAsteriskInputStream = fs.createWriteStream(rawAsteriskInputFileName, { flags: 'a' });
                    rawAsteriskInputStream.on('error', (err) => {
                        console.error(chalk.red(`File Stream Error (Raw Asterisk Input, ${remoteAddress}): Could not write to ${rawAsteriskInputFileName}: ${err.message}`));
                        if (rawAsteriskInputStream && !rawAsteriskInputStream.closed) {
                            rawAsteriskInputStream.end();
                        }
                    });
                    rawAsteriskInputStream.on('open', () => {
                        console.log(chalk.green(`File Stream (Raw Asterisk Input, ${remoteAddress}): Successfully opened ${rawAsteriskInputFileName} for writing.`));
                    });
                    rawAsteriskInputStream.on('close', () => {
                        console.log(chalk.green(`File Stream (Raw Asterisk Input, ${remoteAddress}): Closed ${rawAsteriskInputFileName}. Final size: ${fs.existsSync(rawAsteriskInputFileName) ? fs.statSync(rawAsteriskInputFileName).size : 'N/A'} bytes.`));
                    });
                } catch (fileErr) {
                    console.error(chalk.red(`Error creating raw asterisk input file stream for ${rawAsteriskInputFileName}: ${fileErr.message}`));
                }

                providerContext = {
                    rnnoiseProvider: rnnoiseProvider,
                    rawAsteriskInputStream: rawAsteriskInputStream, // NEU: Speichern des Streams
                    lastActivityTimestamp: Date.now(),
                    callContext: associatedCallContext,
                    rawAsteriskInputFileName: rawAsteriskInputFileName // NEU: Dateinamen speichern
                };
                this.audioProviders.set(remoteAddress, providerContext);

                // NEU: Denoised Audio in kleineren RTP-Frames zurücksenden
                rnnoiseProvider.denoisedAudioStream.on('data', (denoisedAudioChunk) => {
                    for (let i = 0; i < denoisedAudioChunk.length; i += SLIN16_FRAME_SIZE) {
                        const chunkToSend = denoisedAudioChunk.slice(i, i + SLIN16_FRAME_SIZE);
                        this.server.send(chunkToSend, rinfo.port, rinfo.address, (err) => {
                            if (err) {
                                console.error(chalk.red(`UDP Server: Error sending denoised audio to ${remoteAddress}: ${err.message}`));
                            }
                        });
                    }
                });

                // NEU: RnnoiseProvider events abfangen, um Dateigrößen zu loggen
                rnnoiseProvider.on('input_file_closed', (filePath) => {
                    if (fs.existsSync(filePath)) {
                        const fileSize = fs.statSync(filePath).size;
                        console.log(chalk.green(`RnnoiseProvider (Input for RNNoise): File ${filePath} closed. Size: ${fileSize} bytes.`));
                    }
                });

                rnnoiseProvider.on('output_file_ready', (filePath) => {
                    if (fs.existsSync(filePath)) {
                        const fileSize = fs.statSync(filePath).size;
                        console.log(chalk.green(`RnnoiseProvider (Output from RNNoise): File ${filePath} ready. Size: ${fileSize} bytes.`));
                    }
                });

                rnnoiseProvider.on('denoised_wav_ready', (filePath) => {
                    if (fs.existsSync(filePath)) {
                        const fileSize = fs.statSync(filePath).size;
                        console.log(chalk.green(`RnnoiseProvider (Denoised WAV): File ${filePath} ready. Size: ${fileSize} bytes.`));
                    }
                });


            } else {
                providerContext.lastActivityTimestamp = Date.now(); // Update last activity
            }

            // NEU: Schreibe den kompletten UDP-Frame (inkl. RTP-Header) in die Raw Asterisk Input Datei
            if (providerContext.rawAsteriskInputStream && providerContext.rawAsteriskInputStream.writable) {
                providerContext.rawAsteriskInputStream.write(msg);
            }

            // NEU: Logge die tatsächliche Größe des empfangenen Pakets und vergleiche sie mit der erwarteten Größe + RTP Header
            if (msg.length !== (SLIN16_FRAME_SIZE + RTP_HEADER_SIZE)) {
                console.warn(chalk.red.bold(`UDP Server: !!! UNEXPECTED PACKET SIZE !!! Received ${msg.length} bytes from ${remoteAddress}. Expected ${SLIN16_FRAME_SIZE + RTP_HEADER_SIZE} bytes (SLIN16_FRAME_SIZE + RTP_HEADER_SIZE = ${SLIN16_FRAME_SIZE + RTP_HEADER_SIZE}). This indicates a potential issue!`));
            } else {
                // console.log(chalk.gray(`UDP Server: Received expected ${msg.length} bytes (including RTP header) from ${remoteAddress}.`));
            }

            // NEU: Entferne den RTP-Header, bevor der Audio-Chunk verarbeitet wird
            const audioChunk = msg.length >= RTP_HEADER_SIZE ? msg.slice(RTP_HEADER_SIZE) : msg;

            // Prüfe erneut die Größe des Audiosegments (sollte jetzt 640 sein)
            if (audioChunk.length !== SLIN16_FRAME_SIZE) {
                console.error(chalk.red.bold(`UDP Server: CRITICAL: Audio chunk size after stripping RTP header is ${audioChunk.length} bytes. Expected ${SLIN16_FRAME_SIZE} bytes. This will likely cause broken audio.`));
            }

            // Schreibe den ZUGESCHNITTENEN Chunk in den RnnoiseProvider (dieser ist der Input für RNNoise)
            if (providerContext.rnnoiseProvider && providerContext.rnnoiseProvider.incomingAudioStream.writable) {
                providerContext.rnnoiseProvider.incomingAudioStream.write(audioChunk);
            } else {
                console.warn(chalk.yellow(`UDP Server: RnnoiseProvider incomingAudioStream not writable or not initialized for ${remoteAddress}. Dropping chunk.`));
            }
        });

        this.server.on('listening', () => {
            const address = this.server.address();
            console.log(chalk.green(`UDP Audio Server: Now officially listening on ${address.address}:${address.port}`));
            console.log(chalk.green(`UDP Audio Server: Configured to receive audio in format '${this.config.format}' (expected frame size: ${SLIN16_FRAME_SIZE} bytes, plus ${RTP_HEADER_SIZE} bytes for RTP header if present).`));
        });

        this.server.on('error', (err) => {
            console.error(chalk.red(`UDP Audio Server (listening error): ${err.message}`));
            this.server.close();
            process.exit(1);
        });

        this.server.on('close', () => {
            console.log(chalk.yellow(`UDP Audio Server: Server closed.`));
            for (const [remoteAddress, context] of this.audioProviders.entries()) {
                if (context.rnnoiseProvider) {
                    context.rnnoiseProvider.stop();
                }
                if (context.rawAsteriskInputStream && !context.rawAsteriskInputStream.closed) { // NEU: rawAsteriskInputStream schließen
                    context.rawAsteriskInputStream.end();
                }
                console.log(chalk.gray(`Cleaned up RnnoiseProvider and file for ${remoteAddress}.`));
            }
            this.audioProviders.clear();
            this.remoteAddressToCallContextMap.clear();
        });

        this.server.bind(port, host, () => {
            console.log(chalk.green(`UDP Audio Server: Binding to ${host}:${port}`));
        });

        // Aufräummechanismus für inaktive UDP-Streams
        setInterval(() => {
            const now = Date.now();
            for (const [remoteAddress, context] of this.audioProviders.entries()) {
                if (now - context.lastActivityTimestamp > 30000) { // 30 Sekunden Inaktivität
                    console.log(chalk.yellow(`UDP Server: Cleaning up inactive stream from ${remoteAddress}.`));
                    if (context.rnnoiseProvider) {
                        context.rnnoiseProvider.stop();
                    }
                    if (context.rawAsteriskInputStream && !context.rawAsteriskInputStream.closed) { // NEU: rawAsteriskInputStream schließen
                        context.rawAsteriskInputStream.end();
                    }
                    this.audioProviders.delete(remoteAddress);
                    this.remoteAddressToCallContextMap.delete(remoteAddress);
                }
            }
        }, 10000); // Prüfe alle 10 Sekunden auf Inaktivität
    }
    // --- ENDE startAudioServer METHODE (UDP) ---


    async createBridgeAndExternalMedia(initiatingChannel) {
        try {
            const bridge = this.client.Bridge();
            await bridge.create({ type: 'mixing' });
            console.log(chalk.blue(`Bridge ${bridge.id} created.`));

            await bridge.addChannel({ channel: initiatingChannel.id });
            console.log(chalk.blue(`Channel ${initiatingChannel.id} added to bridge ${bridge.id}.`));
            await initiatingChannel.unhold();

            const callContext = this.activeCalls.get(initiatingChannel.id);
            if (callContext) {
                callContext.bridge = bridge;
            } else {
                console.error(chalk.red(`Error: Call context not found for initiating channel ${initiatingChannel.id}.`));
                await initiatingChannel.hangup();
                return;
            }

            const externalMediaChannel = this.client.Channel();

            if (callContext) {
                callContext.externalMediaChannel = externalMediaChannel;
                console.log(chalk.blue(`External Media Channel ${externalMediaChannel.id} added to activeCalls map for initiating channel ${initiatingChannel.id} (PRE-EMPTIVE).`));
            } else {
                console.error(chalk.red(`Critical Error: Call context disappeared during external media channel creation setup for ${initiatingChannel.id}. Aborting.`));
                await externalMediaChannel.hangup();
                await initiatingChannel.hangup();
                return;
            }

            console.log(chalk.magenta(`DEBUG: Calling externalMedia with format: '${this.config.format}', external_host: '${this.config.listenServer}'`));
            await externalMediaChannel.externalMedia({
                app: 'ari-rnnoise',
                format: this.config.format,
                external_host: this.config.listenServer,
                media_direction: 'both'
            });
            console.log(chalk.blue(`External Media Channel ${externalMediaChannel.id} created, connecting to ${this.config.listenServer}.`));

        } catch (err) {
            console.error(chalk.red(`AriRnnoiseProcessor: Error in createBridgeAndExternalMedia for channel ${initiatingChannel.id}: ${err.message}`));
            const callContextToCleanup = this.activeCalls.get(initiatingChannel.id);
            if (callContextToCleanup) {
                this.cleanupCall(callContextToCleanup);
            }
            if (initiatingChannel && initiatingChannel.state !== 'Down') {
                await initiatingChannel.hangup();
            }
        }
    }

    async cleanupCall(callContext) {
        if (!callContext) {
            return;
        }

        if (this.activeCalls.has(callContext.initiatingChannel.id)) {
            this.activeCalls.delete(callContext.initiatingChannel.id);
            console.log(chalk.yellow(`Cleaning up call context for channel ${callContext.initiatingChannel.id}.`));
        } else {
            console.log(chalk.gray(`Cleanup already in progress or call context for ${callContext.initiatingChannel.id} not found.`));
            return;
        }

        try {
            if (callContext.rnnoiseProvider && callContext.rnnoiseProvider.incomingAudioStream) {
                 console.log(chalk.blue(`RnnoiseProcessor: Ending RnnoiseProvider incomingAudioStream for channel ${callContext.initiatingChannel.id}.`));
                 callContext.rnnoiseProvider.incomingAudioStream.end();
            }

            if (callContext.rawAsteriskInputStream && !callContext.rawAsteriskInputStream.closed) { // NEU: rawAsteriskInputStream schließen
                console.log(chalk.blue(`RnnoiseProcessor: Ending rawAsteriskInputStream for channel ${callContext.initiatingChannel.id}.`));
                callContext.rawAsteriskInputStream.end();
            }


            if (callContext.initiatingChannel && callContext.initiatingChannel.state !== 'Down') {
                try {
                    console.log(chalk.yellow(`Hanging up initiating channel ${callContext.initiatingChannel.id}.`));
                    await callContext.initiatingChannel.hangup();
                } catch (hangupErr) {
                    if (hangupErr.message !== 'Channel not found') {
                        console.error(chalk.red(`Error hanging up initiating channel ${callContext.initiatingChannel.id}: ${hangupErr.message}`));
                    } else {
                        console.log(chalk.gray(`Initiating channel ${callContext.initiatingChannel.id} already hung up.`));
                    }
                }
            }
            if (callContext.externalMediaChannel && callContext.externalMediaChannel.state !== 'Down') {
                try {
                    console.log(chalk.yellow(`Hanging up external media channel ${callContext.externalMediaChannel.id}.`));
                    await callContext.externalMediaChannel.hangup();
                } catch (hangupErr) {
                    if (hangupErr.message !== 'Channel not found') {
                        console.error(chalk.red(`Error hanging up external media channel ${callContext.externalMediaChannel.id}: ${hangupErr.message}`));
                    } else {
                        console.log(chalk.gray(`External media channel ${callContext.externalMediaChannel.id} already hung up.`));
                    }
                }
            }
            if (callContext.bridge) {
                try {
                    console.log(chalk.yellow(`Destroying bridge ${callContext.bridge.id}.`));
                    await callContext.bridge.destroy();
                } catch (destroyErr) {
                    if (destroyErr.message !== 'Bridge not found') {
                        console.error(chalk.red(`Error destroying bridge ${callContext.bridge.id}: ${destroyErr.message}`));
                    } else {
                        console.log(chalk.gray(`Bridge ${callContext.bridge.id} already destroyed.`));
                    }
                }
            }

            for (const [remoteAddress, cc] of this.remoteAddressToCallContextMap.entries()) {
                if (cc === callContext) {
                    console.log(chalk.gray(`Removing ${remoteAddress} from remoteAddressToCallContextMap during cleanup.`));
                    this.remoteAddressToCallContextMap.delete(remoteAddress);
                    if (this.audioProviders.has(remoteAddress)) {
                        const providerContext = this.audioProviders.get(remoteAddress);
                        if (providerContext.rawAsteriskInputStream && !providerContext.rawAsteriskInputStream.closed) { // NEU: rawAsteriskInputStream schließen
                            providerContext.rawAsteriskInputStream.end();
                        }
                        this.audioProviders.delete(remoteAddress);
                        console.log(chalk.gray(`Removed ${remoteAddress} from audioProviders map.`));
                    }
                    break;
                }
            }

        } catch (err) {
            console.error(chalk.red(`Error during cleanup: ${err.message}`));
        } finally {
            // Die Dateien werden vom RnnoiseProvider selbst bereinigt, wenn er gestoppt wird.
        }
    }
}

module.exports.AriRnnoiseProcessor = AriRnnoiseProcessor;

rnnoise-provider.js:

// rnnoise-provider.js
const { spawn } = require('child_process');
const { Readable, Writable } = require('stream');
const chalk = require('chalk');
const fs = require('fs');
const crypto = require('crypto');
const path = require('path');
const EventEmitter = require('events');

// Temporäres Verzeichnis, wo die Audio-Dateien gespeichert werden
// NEU: TEMP_AUDIO_DIR wird jetzt in ari-rnnoise-processor.js definiert und sollte global sein
const TEMP_AUDIO_DIR = '/etc/asterisk/tmp';

// RNNoise ist auf 16kHz, 16-bit Mono ausgelegt.
const EXPECTED_RNNOISE_SAMPLE_RATE = 16000;
const EXPECTED_RNNOISE_BIT_DEPTH = 16;
const EXPECTED_RNNOISE_CHANNELS = 1;

class RnnoiseProvider extends EventEmitter {
    constructor(options) {
        super();
        this.options = options;
        this.rnnoiseProcess = null;

        // Dateipfade werden erst beim Start bestimmt
        this.inputFilePath = null; // input-for-rnnoise-<timestamp>.slin16
        this.outputFilePath = null; // output-from-rnnoise-<timestamp>.slin16
        this.outputWavPath = null; // output-denoised-<timestamp>.wav
        this.inputFileStream = null;

        this.incomingAudioStream = new Writable({
            write: (chunk, encoding, callback) => {
                if (this.inputFileStream && this.inputFileStream.writable) {
                    this.inputFileStream.write(chunk, callback);
                } else {
                    console.error(chalk.red(`RnnoiseProvider: Input file stream is not writable or not open. Dropping chunk.`));
                    callback(new Error("Input file stream not writable"));
                }
            },
            final: (callback) => {
                if (this.inputFileStream && !this.inputFileStream.closed) {
                    this.inputFileStream.end(() => {
                        console.log(chalk.green(`RnnoiseProvider: Input file stream ended. File: ${this.inputFilePath}`));
                        this.emit('input_file_closed', this.inputFilePath);
                        // Process the audio file once the input stream is fully written
                        this.processAudioFile();
                        callback();
                    });
                } else {
                    callback();
                }
            }
        });

        this.denoisedAudioStream = new Readable({
            read() {}
        });

        this.start();
    }

    start() {
        // Sicherstellen, dass das Temp-Verzeichnis existiert (redundant, aber sicherheitshalber)
        try {
            fs.mkdirSync(TEMP_AUDIO_DIR, { recursive: true });
        } catch (err) {
            console.error(chalk.red(`RnnoiseProvider: Failed to create temp directory ${TEMP_AUDIO_DIR}: ${err.message}`));
            return;
        }

        const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
        const uniqueId = crypto.randomBytes(8).toString('hex'); // Für zusätzliche Eindeutigkeit

        // NEU: Dateinamen nach Konvention
        this.inputFilePath = path.join(TEMP_AUDIO_DIR, `input-for-rnnoise-${timestamp}-${uniqueId}.slin16`);
        this.outputFilePath = path.join(TEMP_AUDIO_DIR, `output-from-rnnoise-${timestamp}-${uniqueId}.slin16`);
        this.outputWavPath = path.join(TEMP_AUDIO_DIR, `output-denoised-${timestamp}-${uniqueId}.wav`);


        console.log(chalk.green(`RnnoiseProvider: Opening input file stream: ${this.inputFilePath}`));
        this.inputFileStream = fs.createWriteStream(this.inputFilePath);
        this.inputFileStream.on('error', (err) => {
            console.error(chalk.red(`RnnoiseProvider: Input file stream error for ${this.inputFilePath}: ${err.message}`));
            this.stop();
        });
        console.log(chalk.blue(`RnnoiseProvider: Expected rnnoise_demo input format: ${EXPECTED_RNNOISE_SAMPLE_RATE}Hz, ${EXPECTED_RNNOISE_BIT_DEPTH}-bit, ${EXPECTED_RNNOISE_CHANNELS} channel (slin16).`));
    }

    async processAudioFile() {
        if (!this.inputFilePath || !fs.existsSync(this.inputFilePath)) {
            console.warn(chalk.yellow(`RnnoiseProvider: Cannot process audio, input file ${this.inputFilePath} does not exist.`));
            this.denoisedAudioStream.push(null);
            this.cleanupFiles(); // Bereinige, da nichts zu verarbeiten ist
            return;
        }

        const rnnoisePath = '/etc/asterisk/scripts/rnnoise/examples/rnnoise_demo';
        console.log(chalk.blue(`RnnoiseProvider: Starting rnnoise_demo process for file-based processing.`));
        console.log(chalk.gray(`RnnoiseProvider: Executing command: ${rnnoisePath} ${this.inputFilePath} ${this.outputFilePath}`));

        try {
            this.rnnoiseProcess = spawn(rnnoisePath, [this.inputFilePath, this.outputFilePath]);

            this.rnnoiseProcess.on('close', (code) => {
                console.log(chalk.blue(`RnnoiseProvider: rnnoise_demo process finished with code ${code}.`));
                if (code === 0) {
                    this.readProcessedAudio();
                } else {
                    console.error(chalk.red(`RnnoiseProvider: rnnoise_demo process exited with error code ${code}.`));
                    this.denoisedAudioStream.push(null);
                    this.cleanupFiles(); // Bereinige, da die Verarbeitung fehlgeschlagen ist
                }
            });

            this.rnnoiseProcess.on('error', (err) => {
                console.error(chalk.red(`RnnoiseProvider: Failed to start rnnoise_demo process: ${err.message}`));
                if (err.code === 'ENOENT') {
                    console.error(chalk.red.bold(`RnnoiseProvider: Make sure '${rnnoisePath}' exists and is executable.`));
                }
                this.denoisedAudioStream.push(null);
                this.cleanupFiles(); // Bereinige bei Fehler
            });

            this.rnnoiseProcess.stderr.on('data', (data) => {
                console.error(chalk.red(`RnnoiseProvider (rnnoise_demo stderr): ${data.toString().trim()}`));
            });

        } catch (err) {
            console.error(chalk.red(`RnnoiseProvider: Error spawning rnnoise_demo: ${err.message}`));
            this.denoisedAudioStream.push(null);
            this.cleanupFiles(); // Bereinige bei Fehler
        }
    }

    readProcessedAudio() {
        if (!this.outputFilePath || !fs.existsSync(this.outputFilePath)) {
            console.warn(chalk.yellow(`RnnoiseProvider: Processed output file ${this.outputFilePath} not found.`));
            this.denoisedAudioStream.push(null);
            this.cleanupFiles();
            return;
        }

        console.log(chalk.green(`RnnoiseProvider: Reading denoised audio from ${this.outputFilePath}`));
        this.emit('output_file_ready', this.outputFilePath);

        // NEU: Konvertiere die SLIN16-Ausgabedatei in WAV
        this.convertSlin16ToWav(this.outputFilePath, this.outputWavPath)
            .then(() => {
                console.log(chalk.green(`RnnoiseProvider: Successfully converted denoised output to WAV: ${this.outputWavPath}`));
                this.emit('denoised_wav_ready', this.outputWavPath);
            })
            .catch((err) => {
                console.error(chalk.red(`RnnoiseProvider: Failed to convert denoised output to WAV: ${err.message}`));
            });

        const outputReadStream = fs.createReadStream(this.outputFilePath);

        outputReadStream.on('data', (chunk) => {
            this.denoisedAudioStream.push(chunk);
        });

        outputReadStream.on('end', () => {
            console.log(chalk.green(`RnnoiseProvider: Finished reading denoised audio from ${this.outputFilePath}.`));
            this.denoisedAudioStream.push(null); // Signalisiert das Ende des Streams
            // Die cleanupFiles() werden jetzt nach dem Stream-Ende ausgeführt,
            // aber wir behalten die WAV-Datei für das Debugging.
            this.cleanupFiles();
        });

        outputReadStream.on('error', (err) => {
            console.error(chalk.red(`RnnoiseProvider: Error reading processed output file ${this.outputFilePath}: ${err.message}`));
            this.denoisedAudioStream.push(null);
            this.cleanupFiles();
        });
    }

    async convertSlin16ToWav(inputPath, outputPath) {
        return new Promise((resolve, reject) => {
            console.log(chalk.blue(`RnnoiseProvider: Converting ${inputPath} to WAV: ${outputPath}`));
            const ffmpeg = spawn('ffmpeg', [
                '-f', 's16le',
                '-ar', '8000',
                '-ac', '1',
                '-i', inputPath,
                outputPath
            ]);

            ffmpeg.stderr.on('data', (data) => {
                // Logge ffmpeg-Fehler nur im Debug-Modus oder bei tatsächlichen Fehlern
                // console.error(chalk.gray(`FFmpeg stderr: ${data.toString()}`));
            });

            ffmpeg.on('close', (code) => {
                if (code === 0) {
                    resolve();
                } else {
                    reject(new Error(`FFmpeg exited with code ${code}`));
                }
            });

            ffmpeg.on('error', (err) => {
                if (err.code === 'ENOENT') {
                    console.error(chalk.red.bold(`FFmpeg not found. Make sure 'ffmpeg' is installed and in your PATH.`));
                }
                reject(err);
            });
        });
    }

    stop() {
        console.log(chalk.blue(`RnnoiseProvider: Stopping.`));
        if (this.rnnoiseProcess) {
            this.rnnoiseProcess.kill();
            this.rnnoiseProcess = null;
        }
        if (this.inputFileStream && !this.inputFileStream.closed) {
            this.inputFileStream.end();
            this.inputFileStream = null;
        }
        this.denoisedAudioStream.push(null);
        this.cleanupFiles();
    }

    cleanupFiles() {
        // Alte Logik beibehalten: Löschen von .slin16 Dateien
        // if (this.inputFilePath && fs.existsSync(this.inputFilePath)) {
        //     try {
        //         fs.unlinkSync(this.inputFilePath);
        //         console.log(chalk.gray(`RnnoiseProvider: Cleaned up input file: ${this.inputFilePath}`));
        //     } catch (err) {
        //         console.warn(chalk.yellow(`RnnoiseProvider: Could not delete input file ${this.inputFilePath}: ${err.message}`));
        //     }
        // }
        // if (this.outputFilePath && fs.existsSync(this.outputFilePath)) {
        //     try {
        //         fs.unlinkSync(this.outputFilePath);
        //         console.log(chalk.gray(`RnnoiseProvider: Cleaned up output file: ${this.outputFilePath}`));
        //     } catch (err) {
        //         console.warn(chalk.yellow(`RnnoiseProvider: Could not delete output file ${this.outputFilePath}: ${err.message}`));
        //     }
        // }

        // NEU: Diese Dateien werden jetzt FÜR DAS DEBUGGING BEHALTEN
        if (this.inputFilePath && fs.existsSync(this.inputFilePath)) {
            console.log(chalk.yellow(`RnnoiseProvider: Keeping input SLIN16 file for review: ${this.inputFilePath}`));
        }
        if (this.outputFilePath && fs.existsSync(this.outputFilePath)) {
            console.log(chalk.yellow(`RnnoiseProvider: Keeping output SLIN16 file for review: ${this.outputFilePath}`));
        }

        // Die WAV-Datei wird NICHT automatisch gelöscht, um das Debugging zu erleichtern.
        if (this.outputWavPath && fs.existsSync(this.outputWavPath)) {
            console.log(chalk.yellow(`RnnoiseProvider: Keeping WAV file for review: ${this.outputWavPath}`));
        }
    }
}

module.exports.RnnoiseProvider = RnnoiseProvider;```

Why do you think it would do this? Arguments are passed as-is to the ARI application. The ARI application is what has to understand/interpret/use them. If your ARI application isn’t doing that, then it will not behave as you think.

Your code seems to use “this.config.format” to determine the format. How this is set/manipulated/etc I don’t know.

Additionally if you’ve used AI to write this, it has lied to you.

2 Likes

Loud noise with externalMedia (slin16) and Node.js often comes from mismatched audio formats or sample rates. Make sure:

  • Sample rate matches (e.g., 16kHz for slin16)
  • Correct endianness and bit depth
  • No extra headers or corrupted stream data

Check your Node.js audio processing pipeline for format consistency.

Thank you for your responses.

I had some issues sampling the audio, passing it to RNNoise, and feeding it back into the Asterisk stream.

One of the main problems I had in Node was that RNNoise operated on a file-based level and not in real time. That’s why I find the C code much better, because:

a) no sockets need to be created
b) the latency is lower since Asterisk handles the process itself, e.g., sampling or processing the chunks

That’s why I found a more effective solution.

I have now written a C code based on RNNoise, and it works excellently. However, the script is still in development as I am currently conducting some tests. The audio processing is now handled directly by the codec_resample function!

At the moment, I can control RNNoise via the Asterisk dialplan like this:

same => n,Set(RNNOISE(rx)=on)  
;same => n,Set(RNNOISE(tx)=on)  
same => n,Set(RNNOISE(rx,noiselevel)=1) ; 1-15 CNG

There is hardly any latency, but I will continue to optimize the script if possible and then release it. Background noises like a running TV or traffic are no longer audible. This solves the problem with my Cisco phone, which is based on SIP firmware and doesn’t support noise reduction—except in the 3PCC/MPP firmware.

I assume you mean this RNNoise. Were you using the demo app, which wants input and output file names? Given that it wants raw-format audio, it should work with pipes as well, to allow for real-time response.

Also, looking at the include file, the API looks very simple: simple enough to wrap in a fairly small ctypes-based high-level Python wrapper.