#!/usr/bin/python3
import anyio
import asyncari
import logging
import aioudp
import os
import vosk
import array
import sys
import aiohttp
ast_host = os.getenv("AST_HOST", '127.0.0.1')
ast_port = int(os.getenv("AST_ARI_PORT", 8088))
ast_url = os.getenv("AST_URL", 'http://%s:%d/'%(ast_host,ast_port))
ast_username = os.getenv("AST_USER", 'asterisk')
ast_password = os.getenv("AST_PASS", 'asterisk')
ast_app = os.getenv("AST_APP", 'v')
model = vosk.Model('vosk-model-pt-fb-v0.1.1-20220516_2113/')
channels = {}
class Channel:
async def rtp_handler(self, connection):
async for message in connection:
data = array.array('h', message[12:])
data.byteswap()
if self.rec.AcceptWaveform(data.tobytes()):
res = self.rec.Result()
else:
res = self.rec.PartialResult()
print(res)
async def init(self, client, channel):
self.port = 45000 + len(channels)
self.rec = vosk.KaldiRecognizer(model, 16000)
self.udp = aioudp.serve("127.0.0.1", self.port, self.rtp_handler)
await self.udp.__aenter__()
bridge = await client.bridges.create(type='mixing')
media_id = client.generate_id()
logging.info("O canal do Unicast é: " + str(media_id))
# Cria um canal de mídia externa
await client.channels.externalMedia(
channelId=media_id,
app=ast_app,
external_host='127.0.0.1:' + str(self.port),
format='slin16'
)
# Adiciona o canal espionado e o canal de mídia externa à ponte
await bridge.addChannel(channel=[media_id, channel.id])
async def create_channel_in_stasis(client, endpoint):
"""Cria um novo canal no contexto Stasis."""
channel = await client.channels.originate(
endpoint=endpoint,
app=ast_app,
appArgs=['stasis-start']
)
return channel
async def main(channel_id):
async with asyncari.connect(ast_url, ast_app, ast_username, ast_password) as client:
# Cria um canal espionado (snoop) do canal existente
snoop_channel = await client.channels.snoopChannel(
channelId=channel_id,
app=ast_app,
spy='in', # Espiona entrada e saída de áudio
whisper='none' # Não interfere no áudio original
)
# Inicializa o canal local para transcrição
local_channel = Channel()
await local_channel.init(client, snoop_channel)
channels[snoop_channel.id] = local_channel
# Mantém o script em execução para evitar que o aplicativo Stasis seja desativado
await anyio.sleep_forever()
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Uso: {} <channel_id>".format(sys.argv[0]))
sys.exit(1)
logging.basicConfig(level=logging.INFO)
anyio.run(main, sys.argv[1])
After the call enters the queue, search for the channel to be transcribed using: core show channels or curl -u asterisk:asterisk http://localhost:8088/ari/channels
Then I ran the script in Python: python3 script.py
Improvements can be made using flask or fastapi to receive the channel via post as soon as the call is answered in the queue using exten => 300,n(qcall),Queue(300,${QOPTIONS},${QAANNOUNCE},${QMAXWAIT},${QAGI},${QGOSUB}channel,s,1,${QRULE},${QPOSITION},)
[channel]; Send channel to flask
exten => s,1,Progress()
exten => s,n,Noop(${MCORGCHAN})
exten => s,n,System(/usr/bin/python3 /var/lib/asterisk/agi-bin/flask-canal.py ${MCORGCHAN} &)
exten => s,n,Noop(${MEDIA_ID}); MEDIA_ID must be sent via global variable in the flask as soon as transcription starts
exten => s,n,Return()