ari-rnnoise-processor.js:
// Importiere notwendige Module
const client = require('ari-client');
const dgram = require('dgram');
const chalk = require('chalk');
const util = require('util');
const fs = require('fs');
const path = require('path'); // NEU: path importieren
// Importiere unsere neue RnnoiseProvider-Klasse
const { RnnoiseProvider } = require('./rnnoise-provider');
// Definiere die RTP-Paketgröße für s-lin16 (20ms Frame bei 16kHz, 16bit Samples)
const SLIN16_FRAME_SIZE = 16000 * 0.020 * 2; // 16000 Hz * 0.020 Sek * 2 Bytes/Sample = 640 Bytes
const EXPECTED_SAMPLE_RATE = 16000;
const EXPECTED_BIT_DEPTH = 16;
const EXPECTED_CHANNELS = 1;
const RTP_HEADER_SIZE = 12; // Standard RTP Header ist 12 Bytes
// Temporäres Verzeichnis (aus RnnoiseProvider hierher verschoben, falls von beiden genutzt)
const TEMP_AUDIO_DIR = '/etc/asterisk/tmp';
class AriRnnoiseProcessor {
constructor(config) {
this.config = config;
this.client = null;
this.server = null;
this.activeCalls = new Map(); // Map: channelId -> { initiatingChannel, externalMediaChannel, bridge, rnnoiseProvider }
// NEU: Map zur Verknüpfung von Remote-Adresse mit dem callContext
this.remoteAddressToCallContextMap = new Map(); // Map: remoteAddress -> callContext
console.log(chalk.cyan("AriRnnoiseProcessor: Initializing with config:"), util.inspect(config));
// Sicherstellen, dass das Temp-Verzeichnis existiert
console.log(chalk.green(`AriRnnoiseProcessor: Ensuring temp directory exists: ${TEMP_AUDIO_DIR}`));
try {
fs.mkdirSync(TEMP_AUDIO_DIR, { recursive: true });
} catch (err) {
console.error(chalk.red(`AriRnnoiseProcessor: Failed to create temp directory ${TEMP_AUDIO_DIR}: ${err.message}`));
process.exit(1); // Kritischer Fehler
}
this.startAudioServer();
this.connectToAri();
}
async connectToAri() {
const { ariServerUrl, ariUser, ariPassword } = this.config;
try {
this.client = await client.connect(ariServerUrl, ariUser, ariPassword);
console.log(chalk.green(`AriRnnoiseProcessor: Connected to Asterisk ARI at ${ariServerUrl}`));
this.setupAriListeners();
this.client.start('ari-rnnoise');
} catch (err) {
console.error(chalk.red(`AriRnnoiseProcessor: Error connecting to ARI: ${err.message}`));
process.exit(1);
}
}
setupAriListeners() {
this.client.on('StasisStart', async (event, channel) => {
console.log(chalk.blue(`StasisStart: Channel ${channel.id} entered Stasis application '${event.application}'.`));
console.log(chalk.magenta(`DEBUG: event.args for Channel ${channel.id}: ${util.inspect(event.args)}`));
console.log(chalk.bgRed.white(`DEBUG: Current activeCalls map state before processing StasisStart for ${channel.id}:`));
if (this.activeCalls.size === 0) {
console.log(chalk.bgRed.white(` Map is empty.`));
} else {
for (const [key, value] of this.activeCalls.entries()) {
console.log(chalk.bgRed.white(` Map Entry - Key (Initiating Channel ID): ${key}`));
if (value.initiatingChannel) {
console.log(chalk.bgRed.white(` initiatingChannel ID: ${value.initiatingChannel.id}`));
}
if (value.externalMediaChannel) {
console.log(chalk.bgRed.white(` externalMediaChannel ID: ${value.externalMediaChannel.id}`));
}
if (value.bridge) {
console.log(chalk.bgRed.white(` Bridge ID: ${value.bridge.id}`));
}
}
}
console.log(chalk.bgRed.white(`------------------------------------------------------------------`));
let foundCallContextForExternalMedia = null;
// Durchsuche activeCalls nach einem CallContext, der diesen Channel als externalMediaChannel hat
for (const [initiatingChannelId, callContext] of this.activeCalls.entries()) {
if (callContext.externalMediaChannel && callContext.externalMediaChannel.id === channel.id) {
foundCallContextForExternalMedia = callContext;
break;
}
}
if (foundCallContextForExternalMedia) {
console.log(chalk.green(`StasisStart: CORRECTLY DETECTED External Media Channel ${channel.id} (linked to initiating channel ${foundCallContextForExternalMedia.initiatingChannel.id}). Adding to bridge.`));
try {
await foundCallContextForExternalMedia.bridge.addChannel({ channel: channel.id });
console.log(chalk.blue(`External Media Channel ${channel.id} added to bridge ${foundCallContextForExternalMedia.bridge.id}.`));
} catch (err) {
console.error(chalk.red(`Error adding external media channel ${channel.id} to bridge ${foundCallContextForExternalMedia.bridge.id}: ${err.message}`));
await channel.hangup();
this.cleanupCall(foundCallContextForExternalMedia);
}
} else {
// Dies ist ein neuer Initiating Channel
if (this.activeCalls.has(channel.id)) {
const existingCallContext = this.activeCalls.get(channel.id);
console.warn(chalk.yellow(`StasisStart: Initiating Channel ${channel.id} already in activeCalls. This might indicate a dialplan loop or race condition. Hanging up.`));
await channel.hangup();
this.cleanupCall(existingCallContext);
return;
}
console.log(chalk.blue(`StasisStart: New initiating channel ${channel.id} detected. Creating bridge and external media.`));
const newCallContext = { initiatingChannel: channel };
this.activeCalls.set(channel.id, newCallContext);
try {
await channel.answer();
await this.createBridgeAndExternalMedia(channel);
} catch (err) {
console.error(chalk.red(`Error processing new channel ${channel.id}: ${err.message}`));
await channel.hangup();
this.activeCalls.delete(channel.id);
}
}
});
this.client.on('StasisEnd', (event, channel) => {
console.log(chalk.blue(`StasisEnd: Channel ${channel.id} left Stasis application.`));
let callContextToClean = null;
// Finden Sie den passenden CallContext basierend auf Initiator- oder External Media Channel
for (const [initiatingChannelId, callContext] of this.activeCalls.entries()) {
if (callContext.initiatingChannel && callContext.initiatingChannel.id === channel.id) {
callContextToClean = callContext;
console.log(chalk.yellow(`Initiating channel ${channel.id} ended. Preparing cleanup.`));
break;
} else if (callContext.externalMediaChannel && callContext.externalMediaChannel.id === channel.id) {
callContextToClean = callContext;
console.log(chalk.yellow(`External Media channel ${channel.id} ended. Preparing cleanup.`));
break;
}
}
if (callContextToClean) {
if (this.activeCalls.has(callContextToClean.initiatingChannel.id)) {
console.log(chalk.yellow(`Triggering cleanup for call context ${callContextToClean.initiatingChannel.id}.`));
this.cleanupCall(callContextToClean);
} else {
console.log(chalk.gray(`StasisEnd: Call context for channel ${channel.id} already being cleaned up or not found.`));
}
} else {
console.log(chalk.gray(`StasisEnd: Channel ${channel.id} not found in active calls.`));
}
});
this.client.on('ChannelHangupRequest', (event, channel) => {
console.log(chalk.blue(`ChannelHangupRequest: Channel ${channel.id} requested hangup.`));
});
this.client.on('ChannelDestroyed', (event, channel) => {
console.log(chalk.blue(`ChannelDestroyed: Channel ${channel.id} was destroyed.`));
});
this.client.applications.subscribe({
applicationName: 'ari-rnnoise',
eventSource: 'channel:,bridge:'
});
}
// --- startAudioServer METHODE (UDP) ---
startAudioServer() {
const [host, portStr] = this.config.listenServer.split(':');
const port = parseInt(portStr);
this.server = dgram.createSocket('udp4');
// Map zum Speichern von RnnoiseProvider-Instanzen pro Remote-Adresse (Asterisk-Socket)
// Zusätzlich wird jetzt auch der rawAsteriskInputStream hier verwaltet
this.audioProviders = new Map(); // Map: remoteAddress (string) -> { rnnoiseProvider, rawAsteriskInputStream, lastActivityTimestamp, callContext }
this.server.on('message', async (msg, rinfo) => {
const remoteAddress = `${rinfo.address}:${rinfo.port}`;
let providerContext = this.audioProviders.get(remoteAddress);
if (!providerContext) {
console.log(chalk.green(`UDP Server: NEW UDP stream detected from ${remoteAddress}. Initializing RnnoiseProvider.`));
// NEU: Logge das erwartete Format und die Frame-Größe
console.log(chalk.yellow(`UDP Server: Expected incoming audio format: ${EXPECTED_SAMPLE_RATE}Hz, ${EXPECTED_BIT_DEPTH}-bit, ${EXPECTED_CHANNELS} channel (slin16) with frame size: ${SLIN16_FRAME_SIZE} bytes, plus ${RTP_HEADER_SIZE} bytes for RTP header.`));
let associatedCallContext = null;
for (const [initiatingChannelId, cc] of this.activeCalls.entries()) {
if (cc.externalMediaChannel && !cc.rnnoiseProvider && !this.remoteAddressToCallContextMap.has(remoteAddress)) {
associatedCallContext = cc;
break;
}
}
if (!associatedCallContext) {
console.warn(chalk.yellow(`UDP Server: No unique associated call context found for ${remoteAddress}. Dropping UDP stream data for now.`));
if (this.audioProviders.has(remoteAddress)) {
this.audioProviders.delete(remoteAddress);
}
return;
}
const rnnoiseProvider = new RnnoiseProvider(this.config);
associatedCallContext.rnnoiseProvider = rnnoiseProvider; // Verknüpfe RnnoiseProvider mit dem CallContext
this.remoteAddressToCallContextMap.set(remoteAddress, associatedCallContext); // NEU: Map remoteAddress zu CallContext
// NEU: Stream für die rohen Asterisk-Daten (mit RTP-Header)
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const rawAsteriskInputFileName = path.join(TEMP_AUDIO_DIR, `raw-asterisk-input-${timestamp}-${remoteAddress.replace(/[:.]/g, '_')}.slin16`);
let rawAsteriskInputStream = null;
try {
rawAsteriskInputStream = fs.createWriteStream(rawAsteriskInputFileName, { flags: 'a' });
rawAsteriskInputStream.on('error', (err) => {
console.error(chalk.red(`File Stream Error (Raw Asterisk Input, ${remoteAddress}): Could not write to ${rawAsteriskInputFileName}: ${err.message}`));
if (rawAsteriskInputStream && !rawAsteriskInputStream.closed) {
rawAsteriskInputStream.end();
}
});
rawAsteriskInputStream.on('open', () => {
console.log(chalk.green(`File Stream (Raw Asterisk Input, ${remoteAddress}): Successfully opened ${rawAsteriskInputFileName} for writing.`));
});
rawAsteriskInputStream.on('close', () => {
console.log(chalk.green(`File Stream (Raw Asterisk Input, ${remoteAddress}): Closed ${rawAsteriskInputFileName}. Final size: ${fs.existsSync(rawAsteriskInputFileName) ? fs.statSync(rawAsteriskInputFileName).size : 'N/A'} bytes.`));
});
} catch (fileErr) {
console.error(chalk.red(`Error creating raw asterisk input file stream for ${rawAsteriskInputFileName}: ${fileErr.message}`));
}
providerContext = {
rnnoiseProvider: rnnoiseProvider,
rawAsteriskInputStream: rawAsteriskInputStream, // NEU: Speichern des Streams
lastActivityTimestamp: Date.now(),
callContext: associatedCallContext,
rawAsteriskInputFileName: rawAsteriskInputFileName // NEU: Dateinamen speichern
};
this.audioProviders.set(remoteAddress, providerContext);
// NEU: Denoised Audio in kleineren RTP-Frames zurücksenden
rnnoiseProvider.denoisedAudioStream.on('data', (denoisedAudioChunk) => {
for (let i = 0; i < denoisedAudioChunk.length; i += SLIN16_FRAME_SIZE) {
const chunkToSend = denoisedAudioChunk.slice(i, i + SLIN16_FRAME_SIZE);
this.server.send(chunkToSend, rinfo.port, rinfo.address, (err) => {
if (err) {
console.error(chalk.red(`UDP Server: Error sending denoised audio to ${remoteAddress}: ${err.message}`));
}
});
}
});
// NEU: RnnoiseProvider events abfangen, um Dateigrößen zu loggen
rnnoiseProvider.on('input_file_closed', (filePath) => {
if (fs.existsSync(filePath)) {
const fileSize = fs.statSync(filePath).size;
console.log(chalk.green(`RnnoiseProvider (Input for RNNoise): File ${filePath} closed. Size: ${fileSize} bytes.`));
}
});
rnnoiseProvider.on('output_file_ready', (filePath) => {
if (fs.existsSync(filePath)) {
const fileSize = fs.statSync(filePath).size;
console.log(chalk.green(`RnnoiseProvider (Output from RNNoise): File ${filePath} ready. Size: ${fileSize} bytes.`));
}
});
rnnoiseProvider.on('denoised_wav_ready', (filePath) => {
if (fs.existsSync(filePath)) {
const fileSize = fs.statSync(filePath).size;
console.log(chalk.green(`RnnoiseProvider (Denoised WAV): File ${filePath} ready. Size: ${fileSize} bytes.`));
}
});
} else {
providerContext.lastActivityTimestamp = Date.now(); // Update last activity
}
// NEU: Schreibe den kompletten UDP-Frame (inkl. RTP-Header) in die Raw Asterisk Input Datei
if (providerContext.rawAsteriskInputStream && providerContext.rawAsteriskInputStream.writable) {
providerContext.rawAsteriskInputStream.write(msg);
}
// NEU: Logge die tatsächliche Größe des empfangenen Pakets und vergleiche sie mit der erwarteten Größe + RTP Header
if (msg.length !== (SLIN16_FRAME_SIZE + RTP_HEADER_SIZE)) {
console.warn(chalk.red.bold(`UDP Server: !!! UNEXPECTED PACKET SIZE !!! Received ${msg.length} bytes from ${remoteAddress}. Expected ${SLIN16_FRAME_SIZE + RTP_HEADER_SIZE} bytes (SLIN16_FRAME_SIZE + RTP_HEADER_SIZE = ${SLIN16_FRAME_SIZE + RTP_HEADER_SIZE}). This indicates a potential issue!`));
} else {
// console.log(chalk.gray(`UDP Server: Received expected ${msg.length} bytes (including RTP header) from ${remoteAddress}.`));
}
// NEU: Entferne den RTP-Header, bevor der Audio-Chunk verarbeitet wird
const audioChunk = msg.length >= RTP_HEADER_SIZE ? msg.slice(RTP_HEADER_SIZE) : msg;
// Prüfe erneut die Größe des Audiosegments (sollte jetzt 640 sein)
if (audioChunk.length !== SLIN16_FRAME_SIZE) {
console.error(chalk.red.bold(`UDP Server: CRITICAL: Audio chunk size after stripping RTP header is ${audioChunk.length} bytes. Expected ${SLIN16_FRAME_SIZE} bytes. This will likely cause broken audio.`));
}
// Schreibe den ZUGESCHNITTENEN Chunk in den RnnoiseProvider (dieser ist der Input für RNNoise)
if (providerContext.rnnoiseProvider && providerContext.rnnoiseProvider.incomingAudioStream.writable) {
providerContext.rnnoiseProvider.incomingAudioStream.write(audioChunk);
} else {
console.warn(chalk.yellow(`UDP Server: RnnoiseProvider incomingAudioStream not writable or not initialized for ${remoteAddress}. Dropping chunk.`));
}
});
this.server.on('listening', () => {
const address = this.server.address();
console.log(chalk.green(`UDP Audio Server: Now officially listening on ${address.address}:${address.port}`));
console.log(chalk.green(`UDP Audio Server: Configured to receive audio in format '${this.config.format}' (expected frame size: ${SLIN16_FRAME_SIZE} bytes, plus ${RTP_HEADER_SIZE} bytes for RTP header if present).`));
});
this.server.on('error', (err) => {
console.error(chalk.red(`UDP Audio Server (listening error): ${err.message}`));
this.server.close();
process.exit(1);
});
this.server.on('close', () => {
console.log(chalk.yellow(`UDP Audio Server: Server closed.`));
for (const [remoteAddress, context] of this.audioProviders.entries()) {
if (context.rnnoiseProvider) {
context.rnnoiseProvider.stop();
}
if (context.rawAsteriskInputStream && !context.rawAsteriskInputStream.closed) { // NEU: rawAsteriskInputStream schließen
context.rawAsteriskInputStream.end();
}
console.log(chalk.gray(`Cleaned up RnnoiseProvider and file for ${remoteAddress}.`));
}
this.audioProviders.clear();
this.remoteAddressToCallContextMap.clear();
});
this.server.bind(port, host, () => {
console.log(chalk.green(`UDP Audio Server: Binding to ${host}:${port}`));
});
// Aufräummechanismus für inaktive UDP-Streams
setInterval(() => {
const now = Date.now();
for (const [remoteAddress, context] of this.audioProviders.entries()) {
if (now - context.lastActivityTimestamp > 30000) { // 30 Sekunden Inaktivität
console.log(chalk.yellow(`UDP Server: Cleaning up inactive stream from ${remoteAddress}.`));
if (context.rnnoiseProvider) {
context.rnnoiseProvider.stop();
}
if (context.rawAsteriskInputStream && !context.rawAsteriskInputStream.closed) { // NEU: rawAsteriskInputStream schließen
context.rawAsteriskInputStream.end();
}
this.audioProviders.delete(remoteAddress);
this.remoteAddressToCallContextMap.delete(remoteAddress);
}
}
}, 10000); // Prüfe alle 10 Sekunden auf Inaktivität
}
// --- ENDE startAudioServer METHODE (UDP) ---
async createBridgeAndExternalMedia(initiatingChannel) {
try {
const bridge = this.client.Bridge();
await bridge.create({ type: 'mixing' });
console.log(chalk.blue(`Bridge ${bridge.id} created.`));
await bridge.addChannel({ channel: initiatingChannel.id });
console.log(chalk.blue(`Channel ${initiatingChannel.id} added to bridge ${bridge.id}.`));
await initiatingChannel.unhold();
const callContext = this.activeCalls.get(initiatingChannel.id);
if (callContext) {
callContext.bridge = bridge;
} else {
console.error(chalk.red(`Error: Call context not found for initiating channel ${initiatingChannel.id}.`));
await initiatingChannel.hangup();
return;
}
const externalMediaChannel = this.client.Channel();
if (callContext) {
callContext.externalMediaChannel = externalMediaChannel;
console.log(chalk.blue(`External Media Channel ${externalMediaChannel.id} added to activeCalls map for initiating channel ${initiatingChannel.id} (PRE-EMPTIVE).`));
} else {
console.error(chalk.red(`Critical Error: Call context disappeared during external media channel creation setup for ${initiatingChannel.id}. Aborting.`));
await externalMediaChannel.hangup();
await initiatingChannel.hangup();
return;
}
console.log(chalk.magenta(`DEBUG: Calling externalMedia with format: '${this.config.format}', external_host: '${this.config.listenServer}'`));
await externalMediaChannel.externalMedia({
app: 'ari-rnnoise',
format: this.config.format,
external_host: this.config.listenServer,
media_direction: 'both'
});
console.log(chalk.blue(`External Media Channel ${externalMediaChannel.id} created, connecting to ${this.config.listenServer}.`));
} catch (err) {
console.error(chalk.red(`AriRnnoiseProcessor: Error in createBridgeAndExternalMedia for channel ${initiatingChannel.id}: ${err.message}`));
const callContextToCleanup = this.activeCalls.get(initiatingChannel.id);
if (callContextToCleanup) {
this.cleanupCall(callContextToCleanup);
}
if (initiatingChannel && initiatingChannel.state !== 'Down') {
await initiatingChannel.hangup();
}
}
}
async cleanupCall(callContext) {
if (!callContext) {
return;
}
if (this.activeCalls.has(callContext.initiatingChannel.id)) {
this.activeCalls.delete(callContext.initiatingChannel.id);
console.log(chalk.yellow(`Cleaning up call context for channel ${callContext.initiatingChannel.id}.`));
} else {
console.log(chalk.gray(`Cleanup already in progress or call context for ${callContext.initiatingChannel.id} not found.`));
return;
}
try {
if (callContext.rnnoiseProvider && callContext.rnnoiseProvider.incomingAudioStream) {
console.log(chalk.blue(`RnnoiseProcessor: Ending RnnoiseProvider incomingAudioStream for channel ${callContext.initiatingChannel.id}.`));
callContext.rnnoiseProvider.incomingAudioStream.end();
}
if (callContext.rawAsteriskInputStream && !callContext.rawAsteriskInputStream.closed) { // NEU: rawAsteriskInputStream schließen
console.log(chalk.blue(`RnnoiseProcessor: Ending rawAsteriskInputStream for channel ${callContext.initiatingChannel.id}.`));
callContext.rawAsteriskInputStream.end();
}
if (callContext.initiatingChannel && callContext.initiatingChannel.state !== 'Down') {
try {
console.log(chalk.yellow(`Hanging up initiating channel ${callContext.initiatingChannel.id}.`));
await callContext.initiatingChannel.hangup();
} catch (hangupErr) {
if (hangupErr.message !== 'Channel not found') {
console.error(chalk.red(`Error hanging up initiating channel ${callContext.initiatingChannel.id}: ${hangupErr.message}`));
} else {
console.log(chalk.gray(`Initiating channel ${callContext.initiatingChannel.id} already hung up.`));
}
}
}
if (callContext.externalMediaChannel && callContext.externalMediaChannel.state !== 'Down') {
try {
console.log(chalk.yellow(`Hanging up external media channel ${callContext.externalMediaChannel.id}.`));
await callContext.externalMediaChannel.hangup();
} catch (hangupErr) {
if (hangupErr.message !== 'Channel not found') {
console.error(chalk.red(`Error hanging up external media channel ${callContext.externalMediaChannel.id}: ${hangupErr.message}`));
} else {
console.log(chalk.gray(`External media channel ${callContext.externalMediaChannel.id} already hung up.`));
}
}
}
if (callContext.bridge) {
try {
console.log(chalk.yellow(`Destroying bridge ${callContext.bridge.id}.`));
await callContext.bridge.destroy();
} catch (destroyErr) {
if (destroyErr.message !== 'Bridge not found') {
console.error(chalk.red(`Error destroying bridge ${callContext.bridge.id}: ${destroyErr.message}`));
} else {
console.log(chalk.gray(`Bridge ${callContext.bridge.id} already destroyed.`));
}
}
}
for (const [remoteAddress, cc] of this.remoteAddressToCallContextMap.entries()) {
if (cc === callContext) {
console.log(chalk.gray(`Removing ${remoteAddress} from remoteAddressToCallContextMap during cleanup.`));
this.remoteAddressToCallContextMap.delete(remoteAddress);
if (this.audioProviders.has(remoteAddress)) {
const providerContext = this.audioProviders.get(remoteAddress);
if (providerContext.rawAsteriskInputStream && !providerContext.rawAsteriskInputStream.closed) { // NEU: rawAsteriskInputStream schließen
providerContext.rawAsteriskInputStream.end();
}
this.audioProviders.delete(remoteAddress);
console.log(chalk.gray(`Removed ${remoteAddress} from audioProviders map.`));
}
break;
}
}
} catch (err) {
console.error(chalk.red(`Error during cleanup: ${err.message}`));
} finally {
// Die Dateien werden vom RnnoiseProvider selbst bereinigt, wenn er gestoppt wird.
}
}
}
module.exports.AriRnnoiseProcessor = AriRnnoiseProcessor;