Skip to content

Commit 0d0e5dd

Browse files
authored
feat(storage/transfermanager): add DownloadDirectory (#10430)
DownloadDirectory is a convenience function that allows selection of multiple objects to be downloaded in parallel to a local directory.
1 parent b660d68 commit 0d0e5dd

4 files changed

Lines changed: 430 additions & 23 deletions

File tree

storage/transfermanager/downloader.go

Lines changed: 203 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ import (
1919
"errors"
2020
"fmt"
2121
"io"
22+
"io/fs"
2223
"math"
24+
"os"
25+
"path/filepath"
2326
"sync"
2427
"time"
2528

2629
"cloud.google.com/go/storage"
2730
"github.com/googleapis/gax-go/v2/callctx"
31+
"google.golang.org/api/iterator"
2832
)
2933

3034
// Downloader manages a set of parallelized downloads.
@@ -51,24 +55,96 @@ type Downloader struct {
5155
// set on the ctx may time out before the download even starts. To set a timeout
5256
// that starts with the download, use the [WithPerOpTimeout()] option.
5357
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) error {
54-
if d.config.asynchronous && input.Callback == nil {
55-
return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set")
56-
}
57-
if !d.config.asynchronous && input.Callback != nil {
58-
return errors.New("transfermanager: input.Callback must be nil unless the WithCallbacks option is set")
58+
if d.closed() {
59+
return errors.New("transfermanager: Downloader used after WaitAndClose was called")
5960
}
60-
61-
select {
62-
case <-d.doneReceivingInputs:
63-
return errors.New("transfermanager: WaitAndClose called before DownloadObject")
64-
default:
61+
if err := d.validateObjectInput(input); err != nil {
62+
return err
6563
}
6664

6765
input.ctx = ctx
6866
d.addInput(input)
6967
return nil
7068
}
7169

70+
// DownloadDirectory queues the download of a set of objects to a local path.
71+
// This will initiate the download but is non-blocking; call Downloader.Results
72+
// or use the callback to process the result. DownloadDirectory is thread-safe
73+
// and can be called simultaneously from different goroutines.
74+
// DownloadDirectory will resolve any filters on the input and create the needed
75+
// directory structure locally as the operations progress.
76+
// Note: DownloadDirectory overwrites existing files in the directory.
77+
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) error {
78+
if d.closed() {
79+
return errors.New("transfermanager: Downloader used after WaitAndClose was called")
80+
}
81+
if err := d.validateDirectoryInput(input); err != nil {
82+
return err
83+
}
84+
85+
query := &storage.Query{
86+
Prefix: input.Prefix,
87+
StartOffset: input.StartOffset,
88+
EndOffset: input.EndOffset,
89+
MatchGlob: input.MatchGlob,
90+
}
91+
if err := query.SetAttrSelection([]string{"Name"}); err != nil {
92+
return fmt.Errorf("transfermanager: DownloadDirectory query.SetAttrSelection: %w", err)
93+
}
94+
95+
// TODO: Clean up any created directory structure on failure.
96+
97+
objectsToQueue := []string{}
98+
it := d.client.Bucket(input.Bucket).Objects(ctx, query)
99+
for {
100+
attrs, err := it.Next()
101+
if err == iterator.Done {
102+
break
103+
}
104+
if err != nil {
105+
return fmt.Errorf("transfermanager: DownloadDirectory failed to list objects: %w", err)
106+
}
107+
108+
objectsToQueue = append(objectsToQueue, attrs.Name)
109+
}
110+
111+
outs := make(chan DownloadOutput, len(objectsToQueue))
112+
inputs := make([]DownloadObjectInput, 0, len(objectsToQueue))
113+
114+
for _, object := range objectsToQueue {
115+
objDirectory := filepath.Join(input.LocalDirectory, filepath.Dir(object))
116+
filePath := filepath.Join(input.LocalDirectory, object)
117+
118+
// Make sure all directories in the object path exist.
119+
err := os.MkdirAll(objDirectory, fs.ModeDir|fs.ModePerm)
120+
if err != nil {
121+
return fmt.Errorf("transfermanager: DownloadDirectory failed to make directory(%q): %w", objDirectory, err)
122+
}
123+
124+
// Create file to download to.
125+
f, fErr := os.Create(filePath)
126+
if fErr != nil {
127+
return fmt.Errorf("transfermanager: DownloadDirectory failed to create file(%q): %w", filePath, fErr)
128+
}
129+
130+
inputs = append(inputs, DownloadObjectInput{
131+
Bucket: input.Bucket,
132+
Object: object,
133+
Destination: f,
134+
Callback: input.OnObjectDownload,
135+
ctx: ctx,
136+
directory: true,
137+
directoryObjectOutputs: outs,
138+
})
139+
}
140+
141+
if d.config.asynchronous {
142+
go input.gatherObjectOutputs(outs, len(inputs))
143+
}
144+
d.addNewInputs(inputs)
145+
return nil
146+
}
147+
72148
// WaitAndClose waits for all outstanding downloads to complete and closes the
73149
// Downloader. Adding new downloads after this has been called will cause an error.
74150
//
@@ -143,14 +219,37 @@ func (d *Downloader) addInput(input *DownloadObjectInput) {
143219
d.inputsMu.Unlock()
144220
}
145221

222+
// addNewInputs adds a slice of inputs to the downloader.
223+
// This should only be used to queue new objects.
224+
func (d *Downloader) addNewInputs(inputs []DownloadObjectInput) {
225+
d.downloadsInProgress.Add(len(inputs))
226+
227+
d.inputsMu.Lock()
228+
d.inputs = append(d.inputs, inputs...)
229+
d.inputsMu.Unlock()
230+
}
231+
146232
func (d *Downloader) addResult(input *DownloadObjectInput, result *DownloadOutput) {
233+
copiedResult := *result // make a copy so that callbacks do not affect the result
234+
235+
if input.directory {
236+
f := input.Destination.(*os.File)
237+
if err := f.Close(); err != nil && result.Err == nil {
238+
result.Err = fmt.Errorf("closing file(%q): %w", f.Name(), err)
239+
}
240+
241+
if d.config.asynchronous {
242+
input.directoryObjectOutputs <- copiedResult
243+
}
244+
}
147245
// TODO: check checksum if full object
148246

149-
if d.config.asynchronous {
247+
if d.config.asynchronous || input.directory {
150248
input.Callback(result)
151-
} else {
249+
}
250+
if !d.config.asynchronous {
152251
d.resultsMu.Lock()
153-
d.results = append(d.results, *result)
252+
d.results = append(d.results, copiedResult)
154253
d.resultsMu.Unlock()
155254
}
156255

@@ -260,6 +359,35 @@ func (d *Downloader) gatherShards(in *DownloadObjectInput, outs <-chan *Download
260359
d.addResult(in, shardOut)
261360
}
262361

362+
func (d *Downloader) validateObjectInput(in *DownloadObjectInput) error {
363+
if d.config.asynchronous && in.Callback == nil {
364+
return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set")
365+
}
366+
if !d.config.asynchronous && in.Callback != nil {
367+
return errors.New("transfermanager: input.Callback must be nil unless the WithCallbacks option is set")
368+
}
369+
return nil
370+
}
371+
372+
func (d *Downloader) validateDirectoryInput(in *DownloadDirectoryInput) error {
373+
if d.config.asynchronous && in.Callback == nil {
374+
return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set")
375+
}
376+
if !d.config.asynchronous && in.Callback != nil {
377+
return errors.New("transfermanager: input.Callback must be nil unless the WithCallbacks option is set")
378+
}
379+
return nil
380+
}
381+
382+
func (d *Downloader) closed() bool {
383+
select {
384+
case <-d.doneReceivingInputs:
385+
return true
386+
default:
387+
return false
388+
}
389+
}
390+
263391
// NewDownloader creates a new Downloader to add operations to.
264392
// Choice of transport, etc is configured on the client that's passed in.
265393
// The returned Downloader can be shared across goroutines to initiate downloads.
@@ -326,10 +454,12 @@ type DownloadObjectInput struct {
326454
// finish.
327455
Callback func(*DownloadOutput)
328456

329-
ctx context.Context
330-
cancelCtx context.CancelCauseFunc
331-
shard int // the piece of the object range that should be downloaded
332-
shardOutputs chan<- *DownloadOutput
457+
ctx context.Context
458+
cancelCtx context.CancelCauseFunc
459+
shard int // the piece of the object range that should be downloaded
460+
shardOutputs chan<- *DownloadOutput
461+
directory bool // input was queued by calling DownloadDirectory
462+
directoryObjectOutputs chan<- DownloadOutput
333463
}
334464

335465
// downloadShard will read a specific object piece into in.Destination.
@@ -402,6 +532,62 @@ func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout tim
402532
return
403533
}
404534

535+
// DownloadDirectoryInput is the input for a directory to download.
536+
type DownloadDirectoryInput struct {
537+
// Bucket is the bucket in GCS to download from. Required.
538+
Bucket string
539+
540+
// LocalDirectory specifies the directory to download the matched objects
541+
// to. Relative paths are allowed. The directory structure and contents
542+
// must not be modified while the download is in progress.
543+
// The directory will be created if it does not already exist. Required.
544+
LocalDirectory string
545+
546+
// Prefix is the prefix filter to download objects whose names begin with this.
547+
// Optional.
548+
Prefix string
549+
550+
// StartOffset is used to filter results to objects whose names are
551+
// lexicographically equal to or after startOffset. If endOffset is also
552+
// set, the objects listed will have names between startOffset (inclusive)
553+
// and endOffset (exclusive). Optional.
554+
StartOffset string
555+
556+
// EndOffset is used to filter results to objects whose names are
557+
// lexicographically before endOffset. If startOffset is also set, the
558+
// objects listed will have names between startOffset (inclusive) and
559+
// endOffset (exclusive). Optional.
560+
EndOffset string
561+
562+
// MatchGlob is a glob pattern used to filter results (for example, foo*bar). See
563+
// https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-object-glob
564+
// for syntax details. Optional.
565+
MatchGlob string
566+
567+
// Callback will run after all the objects in the directory as selected by
568+
// the provided filters are finished downloading.
569+
// It must be set if and only if the [WithCallbacks] option is set.
570+
Callback func([]DownloadOutput)
571+
572+
// OnObjectDownload will run after every finished object download. Optional.
573+
OnObjectDownload func(*DownloadOutput)
574+
}
575+
576+
// gatherObjectOutputs receives from the given channel exactly numObjects times.
577+
// It will call the callback once all object outputs are received.
578+
// It does not do any verification on the outputs nor does it cancel other
579+
// objects on error.
580+
func (dirin *DownloadDirectoryInput) gatherObjectOutputs(gatherOuts <-chan DownloadOutput, numObjects int) {
581+
outs := make([]DownloadOutput, 0, numObjects)
582+
for i := 0; i < numObjects; i++ {
583+
obj := <-gatherOuts
584+
outs = append(outs, obj)
585+
}
586+
587+
// All objects have been gathered; execute the callback.
588+
dirin.Callback(outs)
589+
}
590+
405591
// DownloadOutput provides output for a single object download, including all
406592
// errors received while downloading object parts. If the download was successful,
407593
// Attrs will be populated.

storage/transfermanager/downloader_buffer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func TestDownloadBufferParallel(t *testing.T) {
132132
if err != nil {
133133
t.Errorf("b.WriteAt: %v", err)
134134
}
135-
if n != 5 {
135+
if n != step {
136136
t.Errorf("expected to write 5 bytes, got %d", n)
137137
}
138138
wg.Done()

storage/transfermanager/downloader_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestWaitAndClose(t *testing.T) {
3535
t.Fatalf("WaitAndClose: %v", err)
3636
}
3737

38-
expectedErr := "transfermanager: WaitAndClose called before DownloadObject"
38+
expectedErr := "transfermanager: Downloader used after WaitAndClose was called"
3939
err = d.DownloadObject(context.Background(), &DownloadObjectInput{})
4040
if err == nil {
4141
t.Fatalf("d.DownloadObject err was nil, should be %q", expectedErr)

0 commit comments

Comments
 (0)