Skip to content

Commit 7daa1bd

Browse files
authored
feat(storage/transfermanager): add SkipIfExists option (#10893)
1 parent 607534c commit 7daa1bd

3 files changed

Lines changed: 121 additions & 20 deletions

File tree

storage/transfermanager/downloader.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,15 +161,22 @@ func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirec
161161
}
162162

163163
// Check if the file exists.
164-
// TODO: add skip option.
164+
fileExists := false
165+
165166
filePath := filepath.Join(input.LocalDirectory, attrs.Name)
166167
if _, err := os.Stat(filePath); err == nil {
167-
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist)
168+
fileExists = true
169+
if !d.config.skipIfExists {
170+
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist)
171+
}
168172
} else if !errors.Is(err, os.ErrNotExist) {
173+
// Encountered an error other than file does not exist.
169174
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, err)
170175
}
171176

172-
objectsToQueue = append(objectsToQueue, attrs.Name)
177+
if !(d.config.skipIfExists && fileExists) {
178+
objectsToQueue = append(objectsToQueue, attrs.Name)
179+
}
173180
}
174181

175182
outs := make(chan DownloadOutput, len(objectsToQueue))

storage/transfermanager/integration_test.go

Lines changed: 92 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -185,22 +185,37 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
185185
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, c *storage.Client, tb downloadTestBucket) {
186186
localDir := t.TempDir()
187187

188-
numCallbacks := 0
189-
callbackMu := sync.Mutex{}
190-
191188
d, err := NewDownloader(c, WithWorkers(2), WithPartSize(maxObjectSize/2), WithCallbacks())
192189
if err != nil {
193190
t.Fatalf("NewDownloader: %v", err)
194191
}
195192

193+
// Test download options - only download directory "dir".
196194
// In lex order we have:
197-
// "dir/nested/again/obj1", -- excluded
195+
// "dir/nested/again/obj1", -- excluded by StartOffset
198196
// "dir/nested/objA", -- included
199197
// "dir/file" -- excluded by MatchGlob
200198
// "dir/objA", -- included
201199
// "dir/objB", -- included
202-
// "dir/objC", -- excluded
203-
wantObjs := 3
200+
// "dir/objC", -- excluded by EndOffset
201+
includedObjs := []string{"dir/nested/objA", "dir/objA", "dir/objB"}
202+
203+
objectDownloaded := make(chan bool)
204+
done := make(chan bool)
205+
206+
trackObjectsDownloaded := func(numObjects *int) {
207+
for {
208+
select {
209+
case <-done:
210+
return
211+
case <-objectDownloaded:
212+
*numObjects++
213+
}
214+
}
215+
}
216+
217+
objectsDownloaded := 0
218+
go trackObjectsDownloaded(&objectsDownloaded)
204219

205220
if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{
206221
Bucket: tb.bucket,
@@ -210,9 +225,7 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
210225
EndOffset: "dir/objC",
211226
MatchGlob: "**obj**",
212227
OnObjectDownload: func(got *DownloadOutput) {
213-
callbackMu.Lock()
214-
numCallbacks++
215-
callbackMu.Unlock()
228+
objectDownloaded <- true
216229

217230
if got.Err != nil {
218231
t.Errorf("result.Err: %v", got.Err)
@@ -240,8 +253,8 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
240253
got.Object = "modifying this shouldn't be a problem"
241254
},
242255
Callback: func(outs []DownloadOutput) {
243-
if len(outs) != wantObjs {
244-
t.Errorf("expected to receive %d results, got %d results", wantObjs, len(outs))
256+
if len(outs) != len(includedObjs) {
257+
t.Errorf("expected to receive %d results, got %d results", len(includedObjs), len(outs))
245258
}
246259

247260
for _, got := range outs {
@@ -275,21 +288,22 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
275288
t.Errorf("d.DownloadDirectory: %v", err)
276289
}
277290

278-
if _, err = d.WaitAndClose(); err != nil {
279-
t.Fatalf("d.WaitAndClose: %v", err)
291+
if _, err := d.WaitAndClose(); err != nil {
292+
t.Errorf("d.WaitAndClose: %v", err)
280293
}
294+
done <- true
281295

282-
if numCallbacks != wantObjs {
283-
t.Errorf("expected to receive %d results, got %d callbacks", (wantObjs), numCallbacks)
296+
if want, got := len(includedObjs), objectsDownloaded; want != got {
297+
t.Errorf("expected to receive %d callbacks, got %d", want, got)
284298
}
285299

286300
entries, err := os.ReadDir(filepath.Join(localDir, "dir"))
287301
if err != nil {
288302
t.Fatalf("os.ReadDir: %v", err)
289303
}
290304

291-
if len(entries) != wantObjs {
292-
t.Errorf("expected %d entries in dir, got %d", (wantObjs), len(entries))
305+
if len(entries) != len(includedObjs) {
306+
t.Errorf("expected %d entries in dir, got %d", len(includedObjs), len(entries))
293307
}
294308

295309
for _, entry := range entries {
@@ -300,6 +314,65 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
300314
t.Errorf("unexpected file %q in dir", entry.Name())
301315
}
302316
}
317+
318+
// Test SkipIfExists() by attempting to download the entire directory.
319+
// The files previously downloaded (ie. includedObjects) should be skipped.
320+
d, err = NewDownloader(c, WithWorkers(2), SkipIfExists())
321+
if err != nil {
322+
t.Fatalf("NewDownloader: %v", err)
323+
}
324+
325+
objectsDownloaded = 0
326+
go trackObjectsDownloaded(&objectsDownloaded)
327+
328+
if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{
329+
Bucket: tb.bucket,
330+
LocalDirectory: localDir,
331+
OnObjectDownload: func(got *DownloadOutput) {
332+
objectDownloaded <- true
333+
334+
if got.Err != nil {
335+
t.Errorf("result.Err: %v", got.Err)
336+
}
337+
338+
for _, obj := range includedObjs {
339+
if strings.EqualFold(got.Object, obj) {
340+
t.Errorf("should have skipped download of object %s", got.Object)
341+
}
342+
}
343+
344+
if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got {
345+
t.Errorf("expected object size %d, got %d", want, got)
346+
}
347+
348+
path := filepath.Join(localDir, got.Object)
349+
f, err := os.Open(path)
350+
if err != nil {
351+
t.Errorf("os.Open(%q): %v", path, err)
352+
}
353+
defer f.Close()
354+
355+
b := bytes.NewBuffer(make([]byte, 0, got.Attrs.Size))
356+
if _, err := io.Copy(b, f); err != nil {
357+
t.Errorf("io.Copy: %v", err)
358+
}
359+
360+
if wantCRC, gotCRC := tb.contentHashes[got.Object], crc32c(b.Bytes()); gotCRC != wantCRC {
361+
t.Errorf("object(%q) at filepath(%q): content crc32c does not match; got: %v, expected: %v", got.Object, path, gotCRC, wantCRC)
362+
}
363+
},
364+
}); err != nil {
365+
t.Errorf("d.DownloadDirectory: %v", err)
366+
}
367+
368+
if _, err := d.WaitAndClose(); err != nil {
369+
t.Errorf("d.WaitAndClose: %v", err)
370+
}
371+
done <- true
372+
373+
if want, got := len(tb.objects)-len(includedObjs), objectsDownloaded; want != got {
374+
t.Errorf("expected to receive %d callbacks, got %d", want, got)
375+
}
303376
})
304377
}
305378

@@ -1007,6 +1080,8 @@ func (tb *downloadTestBucket) Create(prefix string) error {
10071080
ctx := context.Background()
10081081

10091082
tb.bucket = prefix + uidSpace.New()
1083+
// Note: some tests depend on these object names.
1084+
// Verify that tests pass when adding or removing objects.
10101085
tb.objects = []string{
10111086
"!#$&'()*+,:;=,?@,[] and spaces",
10121087
"./obj",

storage/transfermanager/option.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,21 @@ func (wps withPartSize) apply(tm *transferManagerConfig) {
9191
tm.partSize = wps.partSize
9292
}
9393

94+
// SkipIfExists returns a TransferManagerOption that will not download files
95+
// that already exist in the local directory.
96+
//
97+
// By default, if a file already exists the operation will abort and return an error.
98+
func SkipIfExists() Option {
99+
return &skipIfExists{}
100+
}
101+
102+
type skipIfExists struct {
103+
}
104+
105+
func (sie skipIfExists) apply(tm *transferManagerConfig) {
106+
tm.skipIfExists = true
107+
}
108+
94109
type transferManagerConfig struct {
95110
// Workers in thread pool; default numCPU/2 based on previous benchmarks?
96111
numWorkers int
@@ -107,6 +122,10 @@ type transferManagerConfig struct {
107122
// If true, callbacks are used instead of returning results synchronously
108123
// in a slice at the end.
109124
asynchronous bool
125+
126+
// If true, files that already exist in the local directory will not be
127+
// downloaded.
128+
skipIfExists bool
110129
}
111130

112131
func defaultTransferManagerConfig() *transferManagerConfig {

0 commit comments

Comments
 (0)