Skip to content

Commit 015b52c

Browse files
authored
feat(storage/dataflux): add worksteal algorithm to fast-listing (#10913)
1 parent ece7426 commit 015b52c

5 files changed

Lines changed: 483 additions & 31 deletions

File tree

storage/dataflux/fast_list.go

Lines changed: 98 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
"runtime"
22+
"strings"
2123

2224
"cloud.google.com/go/storage"
2325
"golang.org/x/sync/errgroup"
@@ -41,53 +43,80 @@ type ListerInput struct {
4143
// BucketName is the name of the bucket to list objects from. Required.
4244
BucketName string
4345

44-
// Parallelism is number of parallel workers to use for listing. Default value is 10x number of available CPU. Optional.
46+
// Parallelism is number of parallel workers to use for listing.
47+
// Default value is 10x number of available CPU. Optional.
4548
Parallelism int
4649

47-
// BatchSize is the number of objects to list. Default value returns all objects at once. Optional.
48-
// The number of objects returned will be rounded up to a multiple of gcs page size.
50+
// BatchSize is the number of objects to list. Default value returns
51+
// all objects at once. The number of objects returned will be
52+
// rounded up to a multiple of gcs page size. Optional.
4953
BatchSize int
5054

51-
// Query is the query to filter objects for listing. Default value is nil. Optional.
52-
// Use ProjectionNoACL for faster listing. Including ACLs increases latency while fetching objects.
55+
// Query is the query to filter objects for listing. Default value is nil.
56+
// Use ProjectionNoACL for faster listing. Including ACLs increases
57+
// latency while fetching objects. Optional.
5358
Query storage.Query
5459

55-
// SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional.
60+
// SkipDirectoryObjects is to indicate whether to list directory objects.
61+
// Default value is false. Optional.
5662
SkipDirectoryObjects bool
5763
}
5864

59-
// Lister is used for interacting with Dataflux fast-listing.
60-
// The caller should initialize it with NewLister() instead of creating it directly.
65+
// Lister is used for interacting with Dataflux fast-listing. The caller should
66+
// initialize it with NewLister() instead of creating it directly.
6167
type Lister struct {
62-
// method indicates the listing method(open, sequential, worksteal) to be used for listing.
68+
// method indicates the listing method(open, sequential, worksteal) to
69+
// be used for listing.
6370
method listingMethod
6471

65-
// pageToken is the token to use for sequential listing.
66-
pageToken string
67-
6872
// bucket is the bucket handle to list objects from.
6973
bucket *storage.BucketHandle
7074

7175
// batchSize is the number of objects to list.
7276
batchSize int
7377

78+
// parallelism is number of parallel workers to use for listing.
79+
parallelism int
80+
7481
// query is the query to filter objects for listing.
7582
query storage.Query
7683

84+
// pageToken is the token to use for sequential listing.
85+
pageToken string
86+
87+
// ranges is the channel to store the start and end ranges to be listed
88+
// by the workers in worksteal listing.
89+
ranges chan *listRange
90+
7791
// skipDirectoryObjects is to indicate whether to list directory objects.
7892
skipDirectoryObjects bool
7993
}
8094

8195
// NewLister creates a new dataflux Lister to list objects in the give bucket.
8296
func NewLister(c *storage.Client, in *ListerInput) *Lister {
8397
bucket := c.Bucket(in.BucketName)
98+
99+
// If parallelism is not given, set default value to 10x the number of
100+
// available CPU.
101+
if in.Parallelism == 0 {
102+
in.Parallelism = runtime.NumCPU() * 10
103+
}
104+
// Initialize range channel with entire namespace of object for given
105+
// prefix, startoffset and endoffset. For the default range to list is
106+
// entire namespace, start and end will be empty.
107+
rangeChannel := make(chan *listRange, in.Parallelism*2)
108+
start, end := updateStartEndOffset(in.Query.StartOffset, in.Query.EndOffset, in.Query.Prefix)
109+
rangeChannel <- &listRange{startRange: start, endRange: end}
110+
84111
lister := &Lister{
85112
method: open,
113+
parallelism: in.Parallelism,
86114
pageToken: "",
87115
bucket: bucket,
88116
batchSize: in.BatchSize,
89117
query: in.Query,
90118
skipDirectoryObjects: in.SkipDirectoryObjects,
119+
ranges: rangeChannel,
91120
}
92121
return lister
93122
}
@@ -102,13 +131,15 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
102131
var results []*storage.ObjectAttrs
103132
ctx, cancel := context.WithCancel(ctx)
104133
defer cancel()
105-
// Errgroup takes care of running both methods in parallel. As soon as one of the method
106-
// is complete, the running method also stops.
134+
// Errgroup takes care of running both methods in parallel. As soon as one of
135+
// the method is complete, the running method also stops.
107136
g, childCtx := errgroup.WithContext(ctx)
108137

109-
// To start listing method is Open and runs both worksteal and sequential listing in parallel.
110-
// The method which completes first is used for all subsequent runs.
138+
// To start listing method is Open and runs both worksteal and sequential listing
139+
// in parallel. The method which completes first is used for all subsequent runs.
140+
111141
// TODO: Run worksteal listing when method is Open or WorkSteal.
142+
112143
// Run sequential listing when method is Open or Sequential.
113144
if c.method != worksteal {
114145

@@ -118,8 +149,8 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
118149
countError++
119150
return fmt.Errorf("error in running sequential listing: %w", err)
120151
}
121-
// If sequential listing completes first, set method to sequential listing and ranges to nil.
122-
// The nextToken will be used to continue sequential listing.
152+
// If sequential listing completes first, set method to sequential listing
153+
// and ranges to nil. The nextToken will be used to continue sequential listing.
123154
results = objects
124155
c.pageToken = nextToken
125156
c.method = sequential
@@ -135,13 +166,15 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
135166
// If the error is not context.Canceled, then return error instead of falling back
136167
// to the other method. This is so that the error can be fixed and user can take
137168
// advantage of fast-listing.
138-
// As one of the listing method completes, it is expected to cancel context for the other method.
139-
// If both sequential and worksteal listing fail due to context canceled, only then return error.
169+
// As one of the listing method completes, it is expected to cancel context for the
170+
// only then return error. other method. If both sequential and worksteal listing
171+
// fail due to context canceled, return error.
140172
if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) {
141173
return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err)
142174
}
143175

144-
// If ranges for worksteal and pageToken for sequential listing is empty, then listing is complete.
176+
// If ranges for worksteal and pageToken for sequential listing is empty, then
177+
// listing is complete.
145178
if c.pageToken == "" {
146179
return results, iterator.Done
147180
}
@@ -150,6 +183,49 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
150183

151184
// Close closes the range channel of the Lister.
152185
func (c *Lister) Close() {
186+
if c.ranges != nil {
187+
close(c.ranges)
188+
}
189+
}
153190

154-
// TODO: Close range channel for worksteal lister.
191+
// updateStartEndOffset updates start and end offset based on prefix.
192+
// If a prefix is given, adjust start and end value such that it lists
193+
// objects with the given prefix. updateStartEndOffset assumes prefix will
194+
// be added to the object name while listing objects in worksteal algorithm.
195+
//
196+
// For example:
197+
// start = "abc", end = "prefix_a", prefix = "prefix",
198+
//
199+
// end will change to "_a", prefix will be added in worksteal algorithm.
200+
// "abc" is lexicographically smaller than "prefix". So start will be the first
201+
// object with the given prefix.
202+
//
203+
// Therefore start will change to ""(empty string) and end to "_a" .
204+
func updateStartEndOffset(start, end, prefix string) (string, string) {
205+
if prefix == "" {
206+
return start, end
207+
}
208+
if start != "" && end != "" && start >= end {
209+
return start, start
210+
}
211+
if start != "" {
212+
if start <= prefix {
213+
start = ""
214+
} else if strings.HasPrefix(start, prefix) {
215+
start = start[len(prefix):]
216+
} else {
217+
return start, start
218+
}
219+
}
220+
221+
if end != "" {
222+
if len(end) > len(prefix) && strings.HasPrefix(end, prefix) {
223+
end = end[len(prefix):]
224+
} else if end > prefix {
225+
end = ""
226+
} else {
227+
return end, end
228+
}
229+
}
230+
return start, end
155231
}

storage/dataflux/fast_list_test.go

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dataflux
16+
17+
import (
18+
"runtime"
19+
"testing"
20+
21+
"cloud.google.com/go/storage"
22+
)
23+
24+
func TestUpdateStartEndOffset(t *testing.T) {
25+
testcase := []struct {
26+
desc string
27+
start string
28+
end string
29+
prefix string
30+
wantStart string
31+
wantEnd string
32+
}{
33+
// List all objects with the given prefix.
34+
{
35+
desc: "start and end are empty",
36+
start: "",
37+
end: "",
38+
prefix: "pre",
39+
wantStart: "",
40+
wantEnd: "",
41+
},
42+
{
43+
desc: "start is longer and lexicographically before prefix",
44+
start: "abcqre",
45+
end: "",
46+
prefix: "pre",
47+
wantStart: "",
48+
wantEnd: "",
49+
},
50+
{
51+
desc: "start value same as prefix",
52+
start: "pre",
53+
end: "",
54+
prefix: "pre",
55+
wantStart: "",
56+
wantEnd: "",
57+
},
58+
{
59+
desc: "lexicographically start comes before prefix and end after prefix",
60+
start: "abc",
61+
end: "xyz",
62+
prefix: "pre",
63+
wantStart: "",
64+
wantEnd: "",
65+
},
66+
// List bounded objects within the given prefix.
67+
{
68+
desc: "start value contains prefix",
69+
start: "pre_a",
70+
end: "",
71+
prefix: "pre",
72+
wantStart: "_a",
73+
wantEnd: "",
74+
},
75+
{
76+
desc: "end value contains prefix",
77+
start: "",
78+
end: "pre_x",
79+
prefix: "pre",
80+
wantStart: "",
81+
wantEnd: "_x",
82+
},
83+
// With empty prefix, start and end will not be affected.
84+
{
85+
desc: "prefix is empty",
86+
start: "abc",
87+
end: "xyz",
88+
prefix: "",
89+
wantStart: "abc",
90+
wantEnd: "xyz",
91+
},
92+
{
93+
desc: "start is lexicographically higher than end",
94+
start: "xyz",
95+
end: "abc",
96+
prefix: "",
97+
wantStart: "xyz",
98+
wantEnd: "abc",
99+
},
100+
// Cases where no objects will be listed when prefix is given.
101+
{
102+
desc: "end is same as prefix",
103+
start: "",
104+
end: "pre",
105+
prefix: "pre",
106+
wantStart: "pre",
107+
wantEnd: "pre",
108+
},
109+
{
110+
desc: "start is lexicographically higher than end with prefix",
111+
start: "xyz",
112+
end: "abc",
113+
prefix: "pre",
114+
wantStart: "xyz",
115+
wantEnd: "xyz",
116+
},
117+
{
118+
desc: "start is lexicographically higher than prefix",
119+
start: "xyz",
120+
end: "",
121+
prefix: "pre",
122+
wantStart: "xyz",
123+
wantEnd: "xyz",
124+
},
125+
}
126+
127+
for _, tc := range testcase {
128+
t.Run(tc.desc, func(t *testing.T) {
129+
gotStart, gotEnd := updateStartEndOffset(tc.start, tc.end, tc.prefix)
130+
if gotStart != tc.wantStart || gotEnd != tc.wantEnd {
131+
t.Errorf("updateStartEndOffset(%q, %q, %q) got = (%q, %q), want = (%q, %q)", tc.start, tc.end, tc.prefix, gotStart, gotEnd, tc.wantStart, tc.wantEnd)
132+
}
133+
})
134+
}
135+
}
136+
137+
func TestNewLister(t *testing.T) {
138+
gcs := &storage.Client{}
139+
bucketName := "test-bucket"
140+
testcase := []struct {
141+
desc string
142+
query storage.Query
143+
parallelism int
144+
wantStart string
145+
wantEnd string
146+
wantParallelism int
147+
}{
148+
{
149+
desc: "start and end are empty",
150+
query: storage.Query{Prefix: "pre"},
151+
parallelism: 1,
152+
wantStart: "",
153+
wantEnd: "",
154+
wantParallelism: 1,
155+
},
156+
{
157+
desc: "start is longer than prefix",
158+
query: storage.Query{Prefix: "pre", StartOffset: "pre_a"},
159+
parallelism: 1,
160+
wantStart: "_a",
161+
wantEnd: "",
162+
wantParallelism: 1,
163+
},
164+
{
165+
desc: "start and end are empty",
166+
query: storage.Query{Prefix: "pre"},
167+
parallelism: 0,
168+
wantStart: "",
169+
wantEnd: "",
170+
wantParallelism: 10 * runtime.NumCPU(),
171+
},
172+
}
173+
174+
for _, tc := range testcase {
175+
t.Run(tc.desc, func(t *testing.T) {
176+
in := ListerInput{
177+
BucketName: bucketName,
178+
BatchSize: 0,
179+
Query: tc.query,
180+
Parallelism: tc.parallelism,
181+
}
182+
df := NewLister(gcs, &in)
183+
defer df.Close()
184+
if len(df.ranges) != 1 {
185+
t.Errorf("NewLister(%v, %v %v, %v) got len of ranges = %v, want = %v", bucketName, 1, 0, tc.query, len(df.ranges), 1)
186+
}
187+
ranges := <-df.ranges
188+
if df.method != open || df.pageToken != "" || ranges.startRange != tc.wantStart || ranges.endRange != tc.wantEnd || df.parallelism != tc.wantParallelism {
189+
t.Errorf("NewLister(%q, %d, %d, %v) got = (method: %v, token: %q, start: %q, end: %q, parallelism: %d), want = (method: %v, token: %q, start: %q, end: %q, parallelism: %d)", bucketName, 1, 0, tc.query, df.method, df.pageToken, ranges.startRange, ranges.endRange, df.parallelism, open, "", tc.wantStart, tc.wantEnd, tc.wantParallelism)
190+
}
191+
192+
})
193+
}
194+
}

0 commit comments

Comments
 (0)