worker.py 4.96 KB
Newer Older
Vasiliy's avatar
Vasiliy committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python3

from __future__ import unicode_literals

# System modules
import json
import os
import sys
import time
import logging
import subprocess

# Third party modules
import pika
import requests
from environs import Env
import youtube_dl
import boto3
from urllib import parse

## read config
env = Env()
env.read_env()  # read .env file, if it exists

rabbitmq_host = env("RABBIT_HOST", 'localhost')
rabbitmq_port = env("RABBIT_PORT", 5672)
rabbitmq_user = env("RABBIT_USER")
rabbitmq_pass = env("RABBIT_PASS")
rabbitmq_queue = env("RABBIT_QUEUE", "AutoUrlTemplateQueue")

api_url = env("API_URL")
api_key = env("API_KEY")

s3_endpoint = env("S3_ENDPOINT")
s3_web = env("S3_WEB")
s3_access_key = env("S3_ACCESS_KEY")
s3_secret_key = env("S3_SECRET_KEY")
s3_bucket = env("S3_BUCKET")

# Connecting to rabbitmq
credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host, rabbitmq_port, '/', credentials))
channel = connection.channel()

s3client = boto3.client('s3', endpoint_url=s3_endpoint, aws_access_key_id = s3_access_key, aws_secret_access_key = s3_secret_key)

# Processing event from rabbit and sending to internal queue
def process_event(ch, method, properties, body):
    logger.info("Got new meessage from rabbitmq %r" % body)
    t_start = time.time()

    # Parsing data
    event = json.loads(body)

    name = event['name']
    video_url = event['video_url']

    logger.info("Got name: " + str(name) + ", video_url: " + str(video_url))

    # Removing audio.wav before downloading
    os.chdir("/app")
    filelist = [ f for f in os.listdir("/app") if f.endswith(".wav") ]
    for f in filelist:
      os.remove(os.path.join("/app", f))

    # Downloading video
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'wav',
            'preferredquality': '192'
        }],
        'postprocessor_args': [
            '-ar', '16000', '-ac', '1'
        ],
        'prefer_ffmpeg': True,
        'keepvideo': False,
        'outtmpl': '%(id)s.%(ext)s'
    }

    try:
      with youtube_dl.YoutubeDL(ydl_opts) as ydl:
        ydl.download([video_url])
    except:
      logger.error("Can't download audio file, sending callback")
      headers = {"X-API-KEY": api_key}
      payload = {"processed": True, "text_url": "Error downloading video"}
      r = requests.put(api_url + '/requests/' + name, data=json.dumps(payload), headers=headers)
      logger.info("Callback sent, response code: " + str(r.status_code))
      return

    # Converting audio to text
    logger.info("Converting audio to text")
    try:
      p = subprocess.Popen("/app/leopard/leopard_demo leopard/lib/linux/x86_64/libpv_leopard.so leopard/lib/common/acoustic_model.pv leopard/lib/common/language_model.pv license.lic *.wav", stdout=subprocess.PIPE, shell=True)
      (output, err) = p.communicate()
      p_status = p.wait()
      logger.info("Command output : " + str(output))
      logger.info("Command exit status/return code : " + str(p_status))
    except:
      logger.error("Can't convert audio to text, sending callback")
      headers = {"X-API-KEY": api_key}
      payload = {"processed": True, "text_url": "Error converting audio"}
      r = requests.put(api_url + '/requests/' + name, data=json.dumps(payload), headers=headers)
      logger.info("Callback sent, response code: " + str(r.status_code))
      return

    # Uploading file to s3
    try:
      s3client.put_object(Body=output, Bucket=s3_bucket, Key='converted/' + name + '.txt', ACL='public-read')
    except:
      logger.error("Can't upload text to s3, sending callback")
      headers = {"X-API-KEY": api_key}
      payload = {"processed": True, "text_url": "Error uploading to s3"}
      r = requests.put(api_url + '/requests/' + name, data=json.dumps(payload), headers=headers)
      logger.info("Callback sent, response code: " + str(r.status_code))
      return
 
    # Sending callback to API
    headers = {"X-API-KEY": api_key}
    payload = {"processed": True, "text_url": s3_web + "/converted/" + name}
    r = requests.put(api_url + '/requests/' + name, data=json.dumps(payload), headers=headers)
    logger.info("Callback sent, response code: " + str(r.status_code))

    t_elapsed = time.time() - t_start
    logger.info("Finished with " + video_url  + " in " + str(t_elapsed) + " seconds")

    ch.basic_ack(delivery_tag = method.delivery_tag)

# Setting logger
logging.basicConfig(
  format='%(asctime)s | %(levelname)-7s | %(name)-12s | %(message)s',
  datefmt='%d-%b-%y %H:%M:%S',
  level=logging.INFO
)

logger = logging.getLogger(__name__)

# Starting consuming rabbitmq queue
logger.info('Waiting for messages. To exit press CTRL+C')

channel.basic_consume(queue=rabbitmq_queue, on_message_callback=process_event, auto_ack=False)

try:
    channel.start_consuming()
except KeyboardInterrupt:
    logger.info('Interrupted')
    try:
        sys.exit(0)
    except SystemExit:
        os._exit(0)