Commit a1acae88 authored by Vasiliy's avatar Vasiliy

Initial Commit

parents
FROM golang:1.15-alpine AS build
# Installing requirements
RUN apk add --update git && \
rm -rf /tmp/* /var/tmp/* /var/cache/apk/* /var/cache/distfiles/*
# Creating workdir and copying dependencies
WORKDIR /go/src/app
COPY . .
# Installing dependencies
RUN go get
ENV CGO_ENABLED=0
RUN go build -o api main.go requests.go
FROM alpine:3.9.6
RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/testing/" >> /etc/apk/repositories && \
apk add --update bash && \
rm -rf /tmp/* /var/tmp/* /var/cache/apk/* /var/cache/distfiles/*
WORKDIR /app
COPY --from=build /go/src/app/api /app/api
COPY ./migrations/ /app/migrations/
CMD ["/app/api"]
module mod
go 1.15
require (
github.com/Netflix/go-env v0.0.0-20200908232752-3e802f601e28
github.com/golang-migrate/migrate/v4 v4.13.0
github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.4
github.com/lib/pq v1.8.0
github.com/streadway/amqp v1.0.0 // indirect
)
This diff is collapsed.
package main
import (
"encoding/json"
"os"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
"database/sql"
env "github.com/Netflix/go-env"
_ "github.com/lib/pq"
"log"
"net/http"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/streadway/amqp"
)
var (
db *sql.DB
queue amqp.Queue
ch *amqp.Channel
)
type environment struct {
PgsqlURI string `env:"PGSQL_URI"`
Listen string `env:"LISTEN"`
RabbitURI string `env:"RABBIT_URI"`
}
type jsonResponse struct {
Success bool `json:"success"`
Message string `json:"message"`
Data *interface{} `json:"data"`
}
func returnResponse(code int, msg string, data *interface{}, w http.ResponseWriter) {
success := true
if code >= 400 {
success = false
}
respStruct := &jsonResponse{Success: success, Message: msg, Data: data}
resp, _ := json.Marshal(respStruct)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
w.Write(resp)
w.Write([]byte("\n"))
return
}
func main() {
var err error
// Getting configuration
log.Printf("INFO: Getting environment variables\n")
cnf := environment{}
_, err = env.UnmarshalFromEnviron(&cnf)
if err != nil {
log.Fatal(err)
}
// Connecting to database
log.Printf("INFO: Connecting to database")
db, err = sql.Open("postgres", cnf.PgsqlURI)
if err != nil {
log.Fatalf("Can't connect to postgresql: %v", err)
}
// Running migrations
driver, err := postgres.WithInstance(db, &postgres.Config{})
if err != nil {
log.Fatalf("Can't get postgres driver: %v", err)
}
m, err := migrate.NewWithDatabaseInstance("file://./migrations", "postgres", driver)
if err != nil {
log.Fatalf("Can't get migration object: %v", err)
}
m.Up()
// Initialising rabbit mq
// Initing rabbitmq
conn, err := amqp.Dial(cnf.RabbitURI)
if err != nil {
log.Fatalf("Can't connect to rabbitmq")
}
defer conn.Close()
ch, err = conn.Channel()
if err != nil {
log.Fatalf("Can't open channel")
}
defer ch.Close()
err = initRabbit()
if err != nil {
log.Fatalf("Can't create rabbitmq queues: %s\n", err)
}
// Setting handlers for query
log.Printf("INFO: Starting listening on %s\n", cnf.Listen)
router := mux.NewRouter().StrictSlash(true)
// PROJECTS
router.HandleFunc("/requests", authMiddleware(getRequests)).Methods("GET")
router.HandleFunc("/requests", authMiddleware(addRequest)).Methods("POST")
router.HandleFunc("/requests/{name}", authMiddleware(getRequest)).Methods("GET")
router.HandleFunc("/requests/{name}", authMiddleware(updRequest)).Methods("PUT")
router.HandleFunc("/requests/{name}", authMiddleware(delRequest)).Methods("DELETE")
http.ListenAndServe(cnf.Listen, handlers.LoggingHandler(os.Stdout, router))
}
func notImplemented(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Not Implemented\n"))
}
func authMiddleware(next http.HandlerFunc) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tokenString := r.Header.Get("X-API-KEY")
if tokenString != "804b95f13b714ee9912b19861faf3d25" {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte("Missing Authorization Header\n"))
return
}
next(w, r)
})
}
func initRabbit() error {
err := ch.ExchangeDeclare(
"VideoParserExchange", // name
"fanout", // type
true, // durable
false, // auto delete
false, // internal
false, // no wait
nil, // arguments
)
if err != nil {
return err
}
err = ch.ExchangeDeclare(
"VideoParserRetryExchange", // name
"fanout", // type
true, // durable
false, // auto delete
false, // internal
false, // no wait
nil, // arguments
)
if err != nil {
return err
}
args := amqp.Table{"x-dead-letter-exchange": "VideoParserRetryExchange"}
queue, err = ch.QueueDeclare(
"VideoParserWorkerQueue", // name
true, // durable - flush to disk
false, // delete when unused
false, // exclusive - only accessible by the connection that declares
false, // no-wait - the queue will assume to be declared on the server
args, // arguments -
)
if err != nil {
return err
}
args = amqp.Table{"x-dead-letter-exchange": "VideoParserExchange", "x-message-ttl": 60000}
queue, err = ch.QueueDeclare(
"VideoParserWorkerRetryQueue", // name
true, // durable - flush to disk
false, // delete when unused
false, // exclusive - only accessible by the connection that declares
false, // no-wait - the queue will assume to be declared on the server
args, // arguments -
)
if err != nil {
return err
}
queue, err = ch.QueueDeclare(
"VideoParserArchiveQueue", // name
true, // durable - flush to disk
false, // delete when unused
false, // exclusive - only accessible by the connection that declares
false, // no-wait - the queue will assume to be declared on the server
nil, // arguments -
)
if err != nil {
return err
}
err = ch.QueueBind("VideoParserWorkerQueue", "*", "VideoParserExchange", false, nil)
if err != nil {
return err
}
err = ch.QueueBind("VideoParserArchiveQueue", "*", "VideoParserExchange", false, nil)
if err != nil {
return err
}
err = ch.QueueBind("VideoParserWorkerRetryQueue", "*", "VideoParserRetryExchange", false, nil)
if err != nil {
return err
}
return nil
}
CREATE TABLE IF NOT EXISTS requests (
id SERIAL,
name VARCHAR(256),
description VARCHAR(2048),
video_url VARCHAR(64),
text_url VARCHAR(64),
processed BOOL DEFAULT FALSE,
archived BOOL DEFAULT FALSE,
created_at TIMESTAMP DEFAULT now(),
updated_at TIMESTAMP DEFAULT null,
UNIQUE(name)
);
package main
import (
"database/sql"
"encoding/json"
"log"
"net/http"
"time"
"github.com/gorilla/mux"
"github.com/streadway/amqp"
)
type Request struct {
ID int `json:"-"`
Name string `json:"name"`
Description string `json:"description"`
VideoURL string `json:"video_url"`
TextURL string `json:"text_url"`
Archived bool `json:"archived"`
Processed bool `json:"processed"`
CreatedAt *time.Time `json:"created_at"`
UpdatedAt *time.Time `json:"updated_at"`
}
type Requests []Request
func (r *Request) isExist() (bool, error) {
var requestsCount int
stmt := `SELECT count(id) FROM requests WHERE name = $1 AND NOT archived`
err := db.QueryRow(stmt, r.Name).Scan(&requestsCount)
if err != nil {
return false, err
}
if requestsCount == 0 {
return false, nil
}
return true, nil
}
func (r *Request) load() error {
stmt := `SELECT id, name, description, processed, video_url, text_url, created_at, updated_at FROM requests WHERE name = $1 AND NOT archived`
err := db.QueryRow(stmt, r.Name).Scan(&r.ID, &r.Name, &r.Description, &r.Processed, &r.VideoURL, &r.TextURL, &r.CreatedAt, &r.UpdatedAt)
if err != nil {
return err
}
return nil
}
// Add new device or recreate the old one
type postRequestRequest struct {
Name *string `json:"name"`
Description *string `json:"description"`
Processed *bool `json:"processed"`
VideoURL *string `json:"video_url"`
TextURL *string `json:"text_url"`
}
func addRequest(w http.ResponseWriter, r *http.Request) {
// Parsing event
req := postRequestRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
log.Printf("WARNING: Can't parse incoming request: %s\n", err)
returnResponse(400, "Can't parse json", nil, w)
return
}
request := Request{}
if req.Name == nil {
returnResponse(400, "name can't be null", nil, w)
return
}
request.Name = *req.Name
if req.Description != nil {
request.Description = *req.Description
}
if req.Processed != nil {
request.Processed = *req.Processed
}
if req.VideoURL != nil {
request.VideoURL = *req.VideoURL
}
if req.TextURL != nil {
request.TextURL = *req.TextURL
}
// Publishing data to rabbitmq
msg, err := json.Marshal(request)
if err != nil {
log.Printf("ERROR: Marshaling request: %s\n", err)
returnResponse(500, "Can't marshal request ", nil, w)
return
}
err = ch.Publish(
"VideoParserExchange", // exchange
"", // routing key
false, // mandatory - could return an error if there are no consumers or queue
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: msg,
})
if err != nil {
log.Printf("ERROR: Publishing to rabbit: %s\n", err)
returnResponse(500, "Can't publish to rabbit ", nil, w)
return
}
stmt := `INSERT INTO requests (name, description, processed, video_url, text_url) VALUES ($1, $2, $3, $4, $5) RETURNING id`
err = db.QueryRow(stmt, &request.Name, &request.Description, &request.Processed, &request.VideoURL, &request.TextURL).Scan(&request.ID)
if err != nil {
log.Printf("ERROR: Adding new request to database: %s\n", err)
returnResponse(500, "Can't add new request ", nil, w)
return
}
returnResponse(200, "Successfully added new request", nil, w)
}
func getRequests(w http.ResponseWriter, r *http.Request) {
stmt := `SELECT id, name, description, created_at, updated_at FROM requests WHERE not archived`
rows, err := db.Query(stmt)
switch {
case err == sql.ErrNoRows:
returnResponse(200, "No requests found", nil, w)
return
case err != nil:
log.Printf("ERROR: Can't get requests: %s\n", err)
returnResponse(500, "Can't get requests", nil, w)
return
}
defer rows.Close()
var requests Requests
for rows.Next() {
request := Request{}
err = rows.Scan(&request.ID, &request.Name, &request.Description, &request.CreatedAt, &request.UpdatedAt)
if err != nil {
log.Printf("ERROR: Can't scan request: %s\n", err)
returnResponse(500, "Can't scan request", nil, w)
return
}
requests = append(requests, request)
}
err = rows.Err()
if err != nil {
log.Printf("ERROR: Can't scan request: %s\n", err)
returnResponse(500, "Can't scan request", nil, w)
return
}
var data interface{}
data = requests
returnResponse(200, "Successfully got all the requests", &data, w)
}
func getRequest(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
request := Request{}
request.Name = vars["name"]
ok, err := request.isExist()
if err != nil {
log.Printf("ERROR: Can't check if request exists: %s\n", err)
returnResponse(500, "Can't check request", nil, w)
return
}
if !ok {
returnResponse(400, "Request doesn't exist", nil, w)
return
}
err = request.load()
if err != nil {
log.Printf("ERROR: Can't load request: %s\n", err)
returnResponse(500, "Can't load request", nil, w)
return
}
var data interface{}
data = request
returnResponse(200, "Successfully got the request info", &data, w)
}
func updRequest(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
request := Request{}
request.Name = vars["name"]
ok, err := request.isExist()
if err != nil {
log.Printf("ERROR: Can't check if request exists: %s\n", err)
returnResponse(500, "Can't check request", nil, w)
return
}
if !ok {
returnResponse(400, "Request doesn't exist", nil, w)
return
}
err = request.load()
if err != nil {
returnResponse(500, "Can't load request info", nil, w)
return
}
// Parsing event
req := postRequestRequest{}
err = json.NewDecoder(r.Body).Decode(&req)
if err != nil {
log.Printf("WARNING: Can't parse incoming request: %s\n", err)
returnResponse(400, "Can't parse json", nil, w)
return
}
if req.Description != nil {
stmt := `UPDATE requests SET description = $1, updated_at = now() WHERE id = $2`
_, err = db.Exec(stmt, req.Description, request.ID)
if err != nil {
returnResponse(500, "Error updating request", nil, w)
return
}
}
if req.Processed != nil {
stmt := `UPDATE requests SET processed = $1, updated_at = now() WHERE id = $2`
_, err = db.Exec(stmt, req.Processed, request.ID)
if err != nil {
returnResponse(500, "Error updating request", nil, w)
return
}
}
if req.TextURL != nil {
stmt := `UPDATE requests SET text_url = $1, updated_at = now() WHERE id = $2`
_, err = db.Exec(stmt, req.TextURL, request.ID)
if err != nil {
returnResponse(500, "Error updating request", nil, w)
return
}
}
returnResponse(200, "Successfully updated request", nil, w)
}
func delRequest(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
request := Request{}
request.Name = vars["name"]
// Checking if device exists
ok, err := request.isExist()
if err != nil {
log.Printf("ERROR: Can't check if request exists: %s\n", err)
returnResponse(500, "Can't check request", nil, w)
return
}
if !ok {
returnResponse(400, "Request doesn't exist", nil, w)
return
}
stmt := `UPDATE requests SET archived = true, updated_at = now() WHERE name = $1`
_, err = db.Exec(stmt, request.Name)
if err != nil {
returnResponse(500, "Error deleting request", nil, w)
return
}
returnResponse(200, "Successfully deleted request", nil, w)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment