8000 feat(alias): add `DownloadConcurrency` and `DownloadPartSize` option … · AlistGo/alist@2be0c3d · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit 2be0c3d

Browse files
j2rong4cnhshpy
andauthored
feat(alias): add DownloadConcurrency and DownloadPartSize option (#7829)
* fix(net): goroutine logic bug (#7215) * Fix goroutine logic bug * Fix bug --------- Co-authored-by: hpy hs <hshpy.pengyu@gmail.com> * perf(net): sequential and dynamic concurrency * fix(net): incorrect error return * feat(alias): add `DownloadConcurrency` and `DownloadPartSize` option * feat(net): add `ConcurrencyLimit` * pref(net): create `chunk` on demand * refactor * refactor * fix(net): `r.Closers.Add` has no effect * refactor --------- Co-authored-by: hpy hs <hshpy.pengyu@gmail.com>
1 parent bdcf450 commit 2be0c3d

File tree

24 files changed

+396
-238
lines changed

24 files changed

+396
-238
lines changed

drivers/alias/driver.go

+10
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ func (d *Alias) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (
110110
for _, dst := range dsts {
111111
link, err := d.link(ctx, dst, sub, args)
112112
if err == nil {
113+
if !args.Redirect && len(link.URL) > 0 {
114+
// 正常情况下 多并发 仅支持返回URL的驱动
115+
// alias套娃alias 可以让crypt、mega等驱动(不返回URL的) 支持并发
116+
if d.DownloadConcurrency > 0 {
117+
link.Concurrency = d.DownloadConcurrency
118+
}
119+
if d.DownloadPartSize > 0 {
120+
link.PartSize = d.DownloadPartSize * utils.KB
121+
}
122+
}
113123
return link, nil
114124
}
115125
}

drivers/alias/meta.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ type Addition struct {
99
// Usually one of two
1010
// driver.RootPath
1111
// define other
12-
Paths string `json:"paths" required:"true" type:"text"`
13-
ProtectSameName bool `json:"protect_same_name" default:"true" required:"false" help:"Protects same-name files from Delete or Rename"`
12+
Paths string `json:"paths" required:"true" type:"text"`
13+
ProtectSameName bool `json:"protect_same_name" default:"true" required:"false" help:"Protects same-name files from Delete or Rename"`
14+
DownloadConcurrency int `json:"download_concurrency" default:"0" required:"false" type:"number" help:"Need to enable proxy"`
15+
DownloadPartSize int `json:"download_part_size" default:"0" type:"number" required:"false" help:"Need to enable proxy. Unit: KB"`
1416
}
1517

1618
var config = driver.Config{

drivers/alias/util.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/alist-org/alist/v3/internal/errs"
1010
"github.com/alist-org/alist/v3/internal/fs"
1111
"github.com/alist-org/alist/v3/internal/model"
12+
"github.com/alist-org/alist/v3/internal/op"
1213
"github.com/alist-org/alist/v3/internal/sign"
1314
"github.com/alist-org/alist/v3/pkg/utils"
1415
"github.com/alist-org/alist/v3/server/common"
@@ -94,10 +95,15 @@ func (d *Alias) list(ctx context.Context, dst, sub string, args *fs.ListArgs) ([
9495

9596
func (d *Alias) link(ctx context.Context, dst, sub string, args model.LinkArgs) (*model.Link, error) {
9697
reqPath := stdpath.Join(dst, sub)
97-
storage, err := fs.GetStorage(reqPath, &fs.GetStoragesArgs{})
98+
// 参考 crypt 驱动
99+
storage, reqActualPath, err := op.GetStorageAndActualPath(reqPath)
98100
if err != nil {
99101
return nil, err
100102
}
103+
if _, ok := storage.(*Alias); !ok && !args.Redirect {
104+
link, _, err := op.Link(ctx, storage, reqActualPath, args)
105+
return link, err
106+
}
101107
_, err = fs.Get(ctx, reqPath, &fs.GetArgs{NoLog: true})
102108
if err != nil {
103109
return nil, err
@@ -114,7 +120,7 @@ func (d *Alias) link(ctx context.Context, dst, sub string, args model.LinkArgs)
114120
}
115121
return link, nil
116122
}
117-
link, _, err := fs.Link(ctx, reqPath, args)
123+
link, _, err := op.Link(ctx, storage, reqActualPath, args)
118124
return link, err
119125
}
120126

drivers/crypt/driver.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ func (d *Crypt) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (
275275
rrc = converted
276276
}
277277
if rrc != nil {
278-
//remoteRangeReader, err :=
279278
remoteReader, err := rrc.RangeRead(ctx, http_range.Range{Start: underlyingOffset, Length: length})
280279
remoteClosers.AddClosers(rrc.GetClosers())
281280
if err != nil {
@@ -288,10 +287,8 @@ func (d *Crypt) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (
288287
if err != nil {
289288
return nil, err
290289
}
291-
//remoteClosers.Add(remoteLink.MFile)
292-
//keep reuse same MFile and close at last.
293-
remoteClosers.Add(remoteLink.MFile)
294-
return io.NopCloser(remoteLink.MFile), nil
290+
// 可以直接返回,读取完也不会调用Close,直到连接断开Close
291+
return remoteLink.MFile, nil
295292
}
296293

297294
return nil, errs.NotSupport

drivers/github/driver.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,20 @@ import (
55
"encoding/base64"
66
"errors"
77
"fmt"
8+
"io"
9+
"net/http"
10+
stdpath "path"
11+
"strings"
12+
"sync"
13+
"text/template"
14+
815
"github.com/alist-org/alist/v3/drivers/base"
916
"github.com/alist-org/alist/v3/internal/driver"
1017
"github.com/alist-org/alist/v3/internal/errs"
1118
"github.com/alist-org/alist/v3/internal/model"
1219
"github.com/alist-org/alist/v3/pkg/utils"
1320
"github.com/go-resty/resty/v2"
1421
log "github.com/sirupsen/logrus"
15-
"io"
16-
"net/http"
17-
stdpath "path"
18-
"strings"
19-
"sync"
20-
"text/template"
2122
)
2223

2324
type Github struct {
@@ -656,7 +657,7 @@ func (d *Github) putBlob(ctx context.Context, stream model.FileStreamer, up driv
656657
contentReader, contentWriter := io.Pipe()
657658
go func() {
658659
encoder := base64.NewEncoder(base64.StdEncoding, contentWriter)
659-
if _, err := io.Copy(encoder, stream); err != nil {
660+
if _, err := utils.CopyWithBuffer(encoder, stream); err != nil {
660661
_ = contentWriter.CloseWithError(err)
661662
return
662663
}

drivers/halalcloud/driver.go

+7-9
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@ import (
44
"context"
55
"crypto/sha1"
66
"fmt"
7+
"io"
8+
"net/url"
9+
"path"
10+
"strconv"
11+
"time"
12+
713
"github.com/alist-org/alist/v3/drivers/base"
814
"github.com/alist-org/alist/v3/internal/driver"
915
"github.com/alist-org/alist/v3/internal/model"
1016
"github.com/alist-org/alist/v3/internal/op"
1117
"github.com/alist-org/alist/v3/pkg/http_range"
12-
"github.com/alist-org/alist/v3/pkg/utils"
1318
"github.com/aws/aws-sdk-go/aws"
1419
"github.com/aws/aws-sdk-go/aws/credentials"
1520
"github.com/aws/aws-sdk-go/aws/session"
@@ -19,11 +24,6 @@ import (
1924
pubUserFile "github.com/city404/v6-public-rpc-proto/go/v6/userfile"
2025
"github.com/rclone/rclone/lib/readers"
2126
"github.com/zzzhr1990/go-common-entity/userfile"
22-
"io"
23-
"net/url"
24-
"path"
25-
"strconv"
26-
"time"
2727
)
2828

2929
type HalalCloud struct {
@@ -251,7 +251,6 @@ func (d *HalalCloud) getLink(ctx context.Context, file model.Obj, args model.Lin
251251

252252
size := result.FileSize
253253
chunks := getChunkSizes(result.Sizes)
254-
var finalClosers utils.Closers
255254
resultRangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
256255
length := httpRange.Length
257256
if httpRange.Length >= 0 && httpRange.Start+httpRange.Length >= size {
@@ -269,7 +268,6 @@ func (d *HalalCloud) getLink(ctx context.Context, file model.Obj, args model.Lin
269268
sha: result.Sha1,
270269
shaTemp: sha1.New(),
271270
}
272-
finalClosers.Add(oo)
273271

274272
return readers.NewLimitedReadCloser(oo, length), nil
275273
}
@@ -281,7 +279,7 @@ func (d *HalalCloud) getLink(ctx context.Context, file model.Obj, args model.Lin
281279
duration = time.Until(time.Now().Add(time.Hour))
282280
}
283281

284-
resultRangeReadCloser := &model.RangeReadCloser{RangeReader: resultRangeReader, Closers: finalClosers}
282+
resultRangeReadCloser := &model.RangeReadCloser{RangeReader: resultRangeReader}
285283
return &model.Link{
286284
RangeReadCloser: resultRangeReadCloser,
287285
Expiration: &duration,

drivers/mega/driver.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ func (d *Mega) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*
8484
//}
8585

8686
size := file.GetSize()
87-
var finalClosers utils.Closers
8887
resultRangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
8988
length := httpRange.Length
9089
if httpRange.Length >= 0 && httpRange.Start+httpRange.Length >= size {
@@ -103,11 +102,10 @@ func (d *Mega) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*
103102
d: down,
104103
skip: httpRange.Start,
105104
}
106-
finalClosers.Add(oo)
107105

108106
return readers.NewLimitedReadCloser(oo, length), nil
109107
}
110-
resultRangeReadCloser := &model.RangeReadCloser{RangeReader: resultRangeReader, Closers: finalClosers}
108+
resultRangeReadCloser := &model.RangeReadCloser{RangeReader: resultRangeReader}
111109
resultLink := &model.Link{
112110
RangeReadCloser: resultRangeReadCloser,
113111
}

drivers/netease_music/types.go

-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ func (lrc *LyricObj) getLyricLink() *model.Link {
6464
sr := io.NewSectionReader(reader, httpRange.Start, httpRange.Length)
6565
return io.NopCloser(sr), nil
6666
},
67-
Closers: utils.EmptyClosers(),
6867
},
6968
}
7069
}

drivers/netease_music/upload.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (u *uploader) init(stream model.FileStreamer) error {
4747
}
4848

4949
h := md5.New()
50-
io.Copy(h, stream)
50+
utils.CopyWithBuffer(h, stream)
5151
u.md5 = hex.EncodeToString(h.Sum(nil))
5252
_, err := u.file.Seek(0, io.SeekStart)
5353
if err != nil {

drivers/quqi/util.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -300,9 +300,7 @@ func (d *Quqi) linkFromCDN(id string) (*model.Link, error) {
300300
bufferReader := bufio.NewReader(decryptReader)
301301
bufferReader.Discard(int(decryptedOffset))
302302

303-
return utils.NewReadCloser(bufferReader, func() error {
304-
return nil
305-
}), nil
303+
return io.NopCloser(bufferReader), nil
306304
}
307305

308306
return &model.Link{

internal/bootstrap/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/alist-org/alist/v3/cmd/flags"
1010
"github.com/alist-org/alist/v3/drivers/base"
1111
"github.com/alist-org/alist/v3/internal/conf"
12+
"github.com/alist-org/alist/v3/internal/net"
1213
"github.com/alist-org/alist/v3/pkg/utils"
1314
"github.com/caarlos0/env/v9"
1415
log "github.com/sirupsen/logrus"
@@ -63,6 +64,9 @@ func InitConfig() {
6364
log.Fatalf("update config struct error: %+v", err)
6465
}
6566
}
67+
if conf.Conf.MaxConcurrency > 0 {
68+
net.DefaultConcurrencyLimit = &net.ConcurrencyLimit{Limit: conf.Conf.MaxConcurrency}
69+
}
6670
if !conf.Conf.Force {
6771
confFromEnv()
6872
}

internal/conf/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ type Config struct {
106106
Log LogConfig `json:"log"`
107107
DelayedStart int `json:"delayed_start" env:"DELAYED_START"`
108108
MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"`
109+
MaxConcurrency int `json:"max_concurrency" env:"MAX_CONCURRENCY"`
109110
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"`
110111
Tasks TasksConfig `json:"tasks" envPrefix:"TASKS_"`
111112
Cors Cors `json:"cors" envPrefix:"CORS_"`
@@ -151,6 +152,7 @@ func DefaultConfig() *Config {
151152
MaxAge: 28,
152153
},
153154
MaxConnections: 0,
155+
MaxConcurrency: 64,
154156
TlsInsecureSkipVerify: true,
155157
Tasks: TasksConfig{
156158
Download: TaskConfig{

internal/model/args.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ type ListArgs struct {
1717
}
1818

1919
type LinkArgs struct {
20-
IP string
21-
Header http.Header
22-
Type string
23-
HttpReq *http.Request
20+
IP string
21+
Header http.Header
22+
Type string
23+
HttpReq *http.Request
24+
Redirect bool
2425
}
2526

2627
type Link struct {
@@ -87,7 +88,7 @@ type RangeReadCloser struct {
8788
utils.Closers
8889
}
8990

90-
func (r RangeReadCloser) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
91+
func (r *RangeReadCloser) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
9192
rc, err := r.RangeReader(ctx, httpRange)
9293
r.Closers.Add(rc)
9394
return rc, err

0 commit comments

Comments
 (0)
0