@@ -19,12 +19,16 @@ import (
1919 "errors"
2020 "fmt"
2121 "io"
22+ "io/fs"
2223 "math"
24+ "os"
25+ "path/filepath"
2326 "sync"
2427 "time"
2528
2629 "cloud.google.com/go/storage"
2730 "github.com/googleapis/gax-go/v2/callctx"
31+ "google.golang.org/api/iterator"
2832)
2933
3034// Downloader manages a set of parallelized downloads.
@@ -51,24 +55,96 @@ type Downloader struct {
5155// set on the ctx may time out before the download even starts. To set a timeout
5256// that starts with the download, use the [WithPerOpTimeout()] option.
5357func (d * Downloader ) DownloadObject (ctx context.Context , input * DownloadObjectInput ) error {
54- if d .config .asynchronous && input .Callback == nil {
55- return errors .New ("transfermanager: input.Callback must not be nil when the WithCallbacks option is set" )
56- }
57- if ! d .config .asynchronous && input .Callback != nil {
58- return errors .New ("transfermanager: input.Callback must be nil unless the WithCallbacks option is set" )
58+ if d .closed () {
59+ return errors .New ("transfermanager: Downloader used after WaitAndClose was called" )
5960 }
60-
61- select {
62- case <- d .doneReceivingInputs :
63- return errors .New ("transfermanager: WaitAndClose called before DownloadObject" )
64- default :
61+ if err := d .validateObjectInput (input ); err != nil {
62+ return err
6563 }
6664
6765 input .ctx = ctx
6866 d .addInput (input )
6967 return nil
7068}
7169
70+ // DownloadDirectory queues the download of a set of objects to a local path.
71+ // This will initiate the download but is non-blocking; call Downloader.Results
72+ // or use the callback to process the result. DownloadDirectory is thread-safe
73+ // and can be called simultaneously from different goroutines.
74+ // DownloadDirectory will resolve any filters on the input and create the needed
75+ // directory structure locally as the operations progress.
76+ // Note: DownloadDirectory overwrites existing files in the directory.
77+ func (d * Downloader ) DownloadDirectory (ctx context.Context , input * DownloadDirectoryInput ) error {
78+ if d .closed () {
79+ return errors .New ("transfermanager: Downloader used after WaitAndClose was called" )
80+ }
81+ if err := d .validateDirectoryInput (input ); err != nil {
82+ return err
83+ }
84+
85+ query := & storage.Query {
86+ Prefix : input .Prefix ,
87+ StartOffset : input .StartOffset ,
88+ EndOffset : input .EndOffset ,
89+ MatchGlob : input .MatchGlob ,
90+ }
91+ if err := query .SetAttrSelection ([]string {"Name" }); err != nil {
92+ return fmt .Errorf ("transfermanager: DownloadDirectory query.SetAttrSelection: %w" , err )
93+ }
94+
95+ // TODO: Clean up any created directory structure on failure.
96+
97+ objectsToQueue := []string {}
98+ it := d .client .Bucket (input .Bucket ).Objects (ctx , query )
99+ for {
100+ attrs , err := it .Next ()
101+ if err == iterator .Done {
102+ break
103+ }
104+ if err != nil {
105+ return fmt .Errorf ("transfermanager: DownloadDirectory failed to list objects: %w" , err )
106+ }
107+
108+ objectsToQueue = append (objectsToQueue , attrs .Name )
109+ }
110+
111+ outs := make (chan DownloadOutput , len (objectsToQueue ))
112+ inputs := make ([]DownloadObjectInput , 0 , len (objectsToQueue ))
113+
114+ for _ , object := range objectsToQueue {
115+ objDirectory := filepath .Join (input .LocalDirectory , filepath .Dir (object ))
116+ filePath := filepath .Join (input .LocalDirectory , object )
117+
118+ // Make sure all directories in the object path exist.
119+ err := os .MkdirAll (objDirectory , fs .ModeDir | fs .ModePerm )
120+ if err != nil {
121+ return fmt .Errorf ("transfermanager: DownloadDirectory failed to make directory(%q): %w" , objDirectory , err )
122+ }
123+
124+ // Create file to download to.
125+ f , fErr := os .Create (filePath )
126+ if fErr != nil {
127+ return fmt .Errorf ("transfermanager: DownloadDirectory failed to create file(%q): %w" , filePath , fErr )
128+ }
129+
130+ inputs = append (inputs , DownloadObjectInput {
131+ Bucket : input .Bucket ,
132+ Object : object ,
133+ Destination : f ,
134+ Callback : input .OnObjectDownload ,
135+ ctx : ctx ,
136+ directory : true ,
137+ directoryObjectOutputs : outs ,
138+ })
139+ }
140+
141+ if d .config .asynchronous {
142+ go input .gatherObjectOutputs (outs , len (inputs ))
143+ }
144+ d .addNewInputs (inputs )
145+ return nil
146+ }
147+
72148// WaitAndClose waits for all outstanding downloads to complete and closes the
73149// Downloader. Adding new downloads after this has been called will cause an error.
74150//
@@ -143,14 +219,37 @@ func (d *Downloader) addInput(input *DownloadObjectInput) {
143219 d .inputsMu .Unlock ()
144220}
145221
222+ // addNewInputs adds a slice of inputs to the downloader.
223+ // This should only be used to queue new objects.
224+ func (d * Downloader ) addNewInputs (inputs []DownloadObjectInput ) {
225+ d .downloadsInProgress .Add (len (inputs ))
226+
227+ d .inputsMu .Lock ()
228+ d .inputs = append (d .inputs , inputs ... )
229+ d .inputsMu .Unlock ()
230+ }
231+
146232func (d * Downloader ) addResult (input * DownloadObjectInput , result * DownloadOutput ) {
233+ copiedResult := * result // make a copy so that callbacks do not affect the result
234+
235+ if input .directory {
236+ f := input .Destination .(* os.File )
237+ if err := f .Close (); err != nil && result .Err == nil {
238+ result .Err = fmt .Errorf ("closing file(%q): %w" , f .Name (), err )
239+ }
240+
241+ if d .config .asynchronous {
242+ input .directoryObjectOutputs <- copiedResult
243+ }
244+ }
147245 // TODO: check checksum if full object
148246
149- if d .config .asynchronous {
247+ if d .config .asynchronous || input . directory {
150248 input .Callback (result )
151- } else {
249+ }
250+ if ! d .config .asynchronous {
152251 d .resultsMu .Lock ()
153- d .results = append (d .results , * result )
252+ d .results = append (d .results , copiedResult )
154253 d .resultsMu .Unlock ()
155254 }
156255
@@ -260,6 +359,35 @@ func (d *Downloader) gatherShards(in *DownloadObjectInput, outs <-chan *Download
260359 d .addResult (in , shardOut )
261360}
262361
362+ func (d * Downloader ) validateObjectInput (in * DownloadObjectInput ) error {
363+ if d .config .asynchronous && in .Callback == nil {
364+ return errors .New ("transfermanager: input.Callback must not be nil when the WithCallbacks option is set" )
365+ }
366+ if ! d .config .asynchronous && in .Callback != nil {
367+ return errors .New ("transfermanager: input.Callback must be nil unless the WithCallbacks option is set" )
368+ }
369+ return nil
370+ }
371+
372+ func (d * Downloader ) validateDirectoryInput (in * DownloadDirectoryInput ) error {
373+ if d .config .asynchronous && in .Callback == nil {
374+ return errors .New ("transfermanager: input.Callback must not be nil when the WithCallbacks option is set" )
375+ }
376+ if ! d .config .asynchronous && in .Callback != nil {
377+ return errors .New ("transfermanager: input.Callback must be nil unless the WithCallbacks option is set" )
378+ }
379+ return nil
380+ }
381+
382+ func (d * Downloader ) closed () bool {
383+ select {
384+ case <- d .doneReceivingInputs :
385+ return true
386+ default :
387+ return false
388+ }
389+ }
390+
263391// NewDownloader creates a new Downloader to add operations to.
264392// Choice of transport, etc is configured on the client that's passed in.
265393// The returned Downloader can be shared across goroutines to initiate downloads.
@@ -326,10 +454,12 @@ type DownloadObjectInput struct {
326454 // finish.
327455 Callback func (* DownloadOutput )
328456
329- ctx context.Context
330- cancelCtx context.CancelCauseFunc
331- shard int // the piece of the object range that should be downloaded
332- shardOutputs chan <- * DownloadOutput
457+ ctx context.Context
458+ cancelCtx context.CancelCauseFunc
459+ shard int // the piece of the object range that should be downloaded
460+ shardOutputs chan <- * DownloadOutput
461+ directory bool // input was queued by calling DownloadDirectory
462+ directoryObjectOutputs chan <- DownloadOutput
333463}
334464
335465// downloadShard will read a specific object piece into in.Destination.
@@ -402,6 +532,62 @@ func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout tim
402532 return
403533}
404534
535+ // DownloadDirectoryInput is the input for a directory to download.
536+ type DownloadDirectoryInput struct {
537+ // Bucket is the bucket in GCS to download from. Required.
538+ Bucket string
539+
540+ // LocalDirectory specifies the directory to download the matched objects
541+ // to. Relative paths are allowed. The directory structure and contents
542+ // must not be modified while the download is in progress.
543+ // The directory will be created if it does not already exist. Required.
544+ LocalDirectory string
545+
546+ // Prefix is the prefix filter to download objects whose names begin with this.
547+ // Optional.
548+ Prefix string
549+
550+ // StartOffset is used to filter results to objects whose names are
551+ // lexicographically equal to or after startOffset. If endOffset is also
552+ // set, the objects listed will have names between startOffset (inclusive)
553+ // and endOffset (exclusive). Optional.
554+ StartOffset string
555+
556+ // EndOffset is used to filter results to objects whose names are
557+ // lexicographically before endOffset. If startOffset is also set, the
558+ // objects listed will have names between startOffset (inclusive) and
559+ // endOffset (exclusive). Optional.
560+ EndOffset string
561+
562+ // MatchGlob is a glob pattern used to filter results (for example, foo*bar). See
563+ // https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-object-glob
564+ // for syntax details. Optional.
565+ MatchGlob string
566+
567+ // Callback will run after all the objects in the directory as selected by
568+ // the provided filters are finished downloading.
569+ // It must be set if and only if the [WithCallbacks] option is set.
570+ Callback func ([]DownloadOutput )
571+
572+ // OnObjectDownload will run after every finished object download. Optional.
573+ OnObjectDownload func (* DownloadOutput )
574+ }
575+
576+ // gatherObjectOutputs receives from the given channel exactly numObjects times.
577+ // It will call the callback once all object outputs are received.
578+ // It does not do any verification on the outputs nor does it cancel other
579+ // objects on error.
580+ func (dirin * DownloadDirectoryInput ) gatherObjectOutputs (gatherOuts <- chan DownloadOutput , numObjects int ) {
581+ outs := make ([]DownloadOutput , 0 , numObjects )
582+ for i := 0 ; i < numObjects ; i ++ {
583+ obj := <- gatherOuts
584+ outs = append (outs , obj )
585+ }
586+
587+ // All objects have been gathered; execute the callback.
588+ dirin .Callback (outs )
589+ }
590+
405591// DownloadOutput provides output for a single object download, including all
406592// errors received while downloading object parts. If the download was successful,
407593// Attrs will be populated.
0 commit comments