10000 [feature] Initial FTP support as a storage option by alfg · Pull Request #43 · alfg/openencoder · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[feature] Initial FTP support as a storage option #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ media/
progress-log.txt
vendor/
.vscode/
examples/
examples/
data/
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
<h1><code>openencoder</code></h1>
<p><strong>Open Source Cloud Encoder for FFmpeg</strong></p>
<p>A distributed and scalable video encoding pipeline to be used
as an API or web interface using your own hosted infrastructure and FFmpeg encoding presets.</p>
<p>⚠️ Currently a work-in-progress! Check back for updates!</p>
as an API or web interface using your own hosted or cloud infrastructure
and FFmpeg encoding presets.
</p>
<p>⚠️ Currently functional, but a work-in-progress! Check back for updates!</p>
<p>
<a href="https://travis-ci.org/alfg/openencoder">
<img src="https://travis-ci.org/alfg/openencoder.svg?branch=master" alt="Build Status" />
Expand All @@ -25,10 +27,10 @@


## Features
* HTTP API for submitting jobs to an redis-backed FFmpeg worker
* S3 storage (AWS and Digital Ocean)
* Web Dashboard UI for managing encode jobs
* Machines UI/API for scaling worker instances
* HTTP API for submitting jobs to a redis-backed FFmpeg worker
* S3 storage (AWS and Digital Ocean supported)
* Web Dashboard UI for managing encode jobs, workers and users
* Machines UI/API for scaling cloud worker instances
* Database stored FFmpeg encoding presets
* User accounts and roles

Expand Down Expand Up @@ -99,5 +101,6 @@ See: [wiki](https://github.com/alfg/openencoder/wiki) for more documentation.
## Roadmap
See: [Development Project](https://github.com/alfg/openencoder/projects/1) for current development tasks and status.


## License
MIT
1 change: 0 additions & 1 deletion api/data/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (s SettingsOp) GetSettings() []types.Setting {
settings[i].Value = string(plaintext)
}
}

return settings
}

Expand Down
91 changes: 78 additions & 13 deletions api/net/download.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,91 @@
package net

import (
"errors"

"github.com/alfg/openencoder/api/data"
"github.com/alfg/openencoder/api/types"
)

// DownloadFunc creates a download.
type DownloadFunc func(job types.Job) error
// Download downloads a job source based on the driver setting.
func Download(job types.Job) error {
db := data.New()
driver := db.Settings.GetSetting(types.StorageDriver).Value

if driver == "s3" {
if err := s3Download(job); err != nil {
return err
}
return nil
} else if driver == "ftp" {
if err := ftpDownload(job); err != nil {
return err
}
return nil
}
return errors.New("no driver set")
}

// GetDownloader sets the download function.
func GetDownloader() *S3 {
// GetPresignedURL gets a presigned URL from S3.
func GetPresignedURL(job types.Job) (string, error) {
db := data.New()
settings := db.Settings.GetSettings()

config := S3Config{
AccessKey: types.GetSetting(types.S3AccessKey, settings),
SecretKey: types.GetSetting(types.S3SecretKey, settings),
Provider: types.GetSetting(types.S3Provider, settings),
Region: types.GetSetting(types.S3OutboundBucketRegion, settings),
InboundBucket: types.GetSetting(types.S3InboundBucket, settings),
OutboundBucket: types.GetSetting(types.S3OutboundBucket, settings),
}
s3 := NewS3(config)
str, err := s3.GetPresignedURL(job)
if err != nil {
return str, err
}
return str, nil
}

// Get credentials from settings.
// S3Download sets the download function.
func s3Download(job types.Job) error {
db := data.New()
ak := db.Settings.GetSetting(types.S3AccessKey).Value
sk := db.Settings.GetSetting(types.S3SecretKey).Value
pv := db.Settings.GetSetting(types.S3Provider).Value
rg := db.Settings.GetSetting(types.S3OutboundBucketRegion).Value
ib := db.Settings.GetSetting(types.S3InboundBucket).Value
ob := db.Settings.GetSetting(types.S3OutboundBucket).Value
settings := db.Settings.GetSettings()

s3 := NewS3(ak, sk, pv, rg, ib, ob)
// Get job data.
j, err := db.Jobs.GetJobByGUID(job.GUID)
if err != nil {
log.Error(err)
return err
}
encodeID := j.EncodeID

config := S3Config{
AccessKey: types.GetSetting(types.S3AccessKey, settings),
SecretKey: types.GetSetting(types.S3SecretKey, settings),
Provider: types.GetSetting(types.S3Provider, settings),
Region: types.GetSetting(types.S3OutboundBucketRegion, settings),
InboundBucket: types.GetSetting(types.S3InboundBucket, settings),
OutboundBucket: types.GetSetting(types.S3OutboundBucket, settings),
}
s3 := NewS3(config)

// Download with progress updates.
go trackTransferProgress(encodeID, s3)
err = s3.Download(job)
close(progressCh)

return err
}

// FTPDownload sets the FTP download function.
func ftpDownload(job types.Job) error {
db := data.New()
addr := db.Settings.GetSetting(types.FTPAddr).Value
user := db.Settings.GetSetting(types.FTPUsername).Value
pass := db.Settings.GetSetting(types.FTPPassword).Value

return s3
f := NewFTP(addr, user, pass)
err := f.Download(job)
return err
}
177 changes: 177 additions & 0 deletions api/net/ftp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package net

import (
"bufio"
"fmt"
"io"
"net/textproto"
"net/url"
"os"
"path"
"path/filepath"
"time"

"github.com/alfg/openencoder/api/types"
"github.com/jlaffaye/ftp"
)

const (
// ErrorFileExists error return from FTP client.
ErrorFileExists = "Can't create directory: File exists"
)

// FTP connection details.
type FTP struct {
Addr string
Username string
Password string
Timeout time.Duration
}

// NewFTP creates a new FTP instance.
func NewFTP(addr string, username string, password string) *FTP {
return &FTP{
Addr: addr,
Username: username,
Password: password,
Timeout: 5,
}
}

// Download download a file from an FTP connection.
func (f *FTP) Download(job types.Job) error {
log.Info("downloading from FTP: ", job.Source)

// Create FTP connection.
c, err := ftp.Dial(f.Addr, ftp.DialWithTimeout(f.Timeout*time.Second))
if err != nil {
log.Error(err)
return err
}

// Login.
err = c.Login(f.Username, f.Password)
if err != nil {
log.Error(err)
return err
}

resp, err := c.Retr(job.Source)
if err != nil {
log.Error(err)
return err
}
defer resp.Close()

outputFile, _ := os.OpenFile(job.LocalSource, os.O_WRONLY|os.O_CREATE, 0644)
defer outputFile.Close()

reader := bufio.NewReader(resp)
p := make([]byte, 1024*4)

for {
n, err := reader.Read(p)
if err == io.EOF {
break
}
outputFile.Write(p[:n])
}

// Quit connection.
if err := c.Quit(); err != nil {
log.Error(err)
return err
}
return err
}

// Upload uploads a file to FTP.
func (f *FTP) Upload(job types.Job) error {
log.Info("uploading files to FTP: ", job.Destination)
defer log.Info("upload complete")

// Get list of files in output dir.
filelist := []string{}
filepath.Walk(path.Dir(job.LocalSource)+"/dst", func(path string, f os.FileInfo, err error) error {
if isDirectory(path) {
return nil
}
filelist = append(filelist, path)
return nil
})

f.uploadDir(filelist, job)
return nil
}

func (f *FTP) uploadDir(filelist []string, job types.Job) {
fmt.Println(filelist)
for _, file := range filelist {
f.uploadFile(file, job)
}
}

// UploadFile uploads a file from an FTP connection.
func (f *FTP) uploadFile(path string, job types.Job) error {
// Create FTP connection.
c, err := ftp.Dial(f.Addr, ftp.DialWithTimeout(f.Timeout*time.Second))
if err != nil {
log.Error(err)
return err
}

// Login.
err = c.Login(f.Username, f.Password)
if err != nil {
log.Error(err)
return err
}

file, err := os.Open(path)
defer file.Close()
if err != nil {
return err
}
reader := bufio.NewReader(file)

// Set destination path.
parsedURL, _ := url.Parse(job.Destination)
key := parsedURL.Path + filepath.Base(path)

// Create directory.
err = c.MakeDir(parsedURL.Path)
if err != nil && err.(*textproto.Error).Msg != ErrorFileExists {
log.Error(err)
return err
}

err = c.Stor(key, reader)
if err != nil {
log.Error(err)
return err
}
return nil
}

// ListFiles lists FTP files for a given prefix.
func (f *FTP) ListFiles(prefix string) ([]*ftp.Entry, error) {
c, err := ftp.Dial(f.Addr, ftp.DialWithTimeout(f.Timeout*time.Second))
if err != nil {
log.Error(err)
return nil, err
}

err = c.Login(f.Username, f.Password)
if err != nil {
log.Error(err)
return nil, err
}

entries, err := c.List(prefix)

if err := c.Quit(); err != nil {
log.Error(err)
return nil, err
}
return entries, nil
}
2 changes: 2 additions & 0 deletions api/net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ const (
EndpointAmazonAWS = ".amazonaws.com"
EndpointDigitalOceanSpaces = ".digitaloceanspaces.com"
PresignedDuration = 72 * time.Hour // 3 days.
ProgressInterval = time.Second * 5
)

// S3 Provider Endpoints with region.
var (
EndpointDigitalOceanSpacesRegion = func(region string) string { return region + EndpointDigitalOceanSpaces }
EndpointAmazonAWSRegion = func(region string) string { return "s3." + region + EndpointDigitalOceanSpaces }
progressCh chan struct{}
)
Loading
0