RaspberryPi-Security-Camera / SecurityCamera / alerttools / notifications / twilionotifier.py
twilionotifier.py
Raw
# import the necessary packages
from twilio.rest import Client
import boto3
from threading import Thread
import queue


class TwilioNotifier:
    def __init__(self, conf):
        # store the configuration object
        self.conf = conf

    def send(self, msg, tempVideo, timelapse=False):
        # start a thread to upload the file and send it
        ret_VideoName = queue.Queue()
        ret_videoUrl = queue.Queue()

        t = Thread(target=self._send, args=(msg, tempVideo, ret_VideoName, ret_videoUrl, timelapse,))
        t.start()
        ret1 = t.join()

        VideoName = ret_VideoName.get()
        videoUrl = ret_videoUrl.get()

        return VideoName, videoUrl
        # return fileName,fileUrl

    def _send(self, msg, tempVideo, out_VideoName, out_videoUrl, timelapse):
        # create a s3 client object
        s3 = boto3.client("s3",
                          aws_access_key_id=self.conf["aws_access_key_id"],
                          aws_secret_access_key=self.conf["aws_secret_access_key"],
                          )

        # get the filename and upload the video in public read mode
        if (timelapse):
            filename = tempVideo.split("/home/pi/SecurityCamera/tempTl/", 1)[1]
            print(filename)
            s3.upload_file(tempVideo, self.conf["s3_bucket"], filename,
                           ExtraArgs={"ACL": "public-read", "ContentType": "video/mp4"})
        else:
            filename = tempVideo.path[tempVideo.path.rfind("/") + 1:]
            s3.upload_file(tempVideo.path, self.conf["s3_bucket"], filename,
                           ExtraArgs={"ACL": "public-read", "ContentType": "video/mp4"})

        # get the bucket location and build the url
        location = s3.get_bucket_location(
            Bucket=self.conf["s3_bucket"])["LocationConstraint"]
        url = "https://s3-{}.amazonaws.com/{}/{}".format(location,
                                                         self.conf["s3_bucket"], filename)

        msg = msg + 'You can check the video by clicking on following link: ' + url
        # initialize the twilio client and send the message
        print(msg)

        out_VideoName.put(filename)
        out_videoUrl.put(url)

        # Below code sends SMS alert to user via Twilio
        client = Client(self.conf["twilio_sid"], self.conf["twilio_auth"])
        client.messages.create(to=self.conf["twilio_to"], from_=self.conf["twilio_from"], body=msg)

        # delete the temporary file
        if (timelapse == False):
            tempVideo.cleanup()