@@ -18,6 +18,8 @@ import (
1818 "context"
1919 "errors"
2020 "fmt"
21+ "runtime"
22+ "strings"
2123
2224 "cloud.google.com/go/storage"
2325 "golang.org/x/sync/errgroup"
@@ -41,53 +43,80 @@ type ListerInput struct {
4143 // BucketName is the name of the bucket to list objects from. Required.
4244 BucketName string
4345
44- // Parallelism is number of parallel workers to use for listing. Default value is 10x number of available CPU. Optional.
46+ // Parallelism is number of parallel workers to use for listing.
47+ // Default value is 10x number of available CPU. Optional.
4548 Parallelism int
4649
47- // BatchSize is the number of objects to list. Default value returns all objects at once. Optional.
48- // The number of objects returned will be rounded up to a multiple of gcs page size.
50+ // BatchSize is the number of objects to list. Default value returns
51+ // all objects at once. The number of objects returned will be
52+ // rounded up to a multiple of gcs page size. Optional.
4953 BatchSize int
5054
51- // Query is the query to filter objects for listing. Default value is nil. Optional.
52- // Use ProjectionNoACL for faster listing. Including ACLs increases latency while fetching objects.
55+ // Query is the query to filter objects for listing. Default value is nil.
56+ // Use ProjectionNoACL for faster listing. Including ACLs increases
57+ // latency while fetching objects. Optional.
5358 Query storage.Query
5459
55- // SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional.
60+ // SkipDirectoryObjects is to indicate whether to list directory objects.
61+ // Default value is false. Optional.
5662 SkipDirectoryObjects bool
5763}
5864
59- // Lister is used for interacting with Dataflux fast-listing.
60- // The caller should initialize it with NewLister() instead of creating it directly.
65+ // Lister is used for interacting with Dataflux fast-listing. The caller should
66+ // initialize it with NewLister() instead of creating it directly.
6167type Lister struct {
62- // method indicates the listing method(open, sequential, worksteal) to be used for listing.
68+ // method indicates the listing method(open, sequential, worksteal) to
69+ // be used for listing.
6370 method listingMethod
6471
65- // pageToken is the token to use for sequential listing.
66- pageToken string
67-
6872 // bucket is the bucket handle to list objects from.
6973 bucket * storage.BucketHandle
7074
7175 // batchSize is the number of objects to list.
7276 batchSize int
7377
78+ // parallelism is number of parallel workers to use for listing.
79+ parallelism int
80+
7481 // query is the query to filter objects for listing.
7582 query storage.Query
7683
84+ // pageToken is the token to use for sequential listing.
85+ pageToken string
86+
87+ // ranges is the channel to store the start and end ranges to be listed
88+ // by the workers in worksteal listing.
89+ ranges chan * listRange
90+
7791 // skipDirectoryObjects is to indicate whether to list directory objects.
7892 skipDirectoryObjects bool
7993}
8094
8195// NewLister creates a new dataflux Lister to list objects in the give bucket.
8296func NewLister (c * storage.Client , in * ListerInput ) * Lister {
8397 bucket := c .Bucket (in .BucketName )
98+
99+ // If parallelism is not given, set default value to 10x the number of
100+ // available CPU.
101+ if in .Parallelism == 0 {
102+ in .Parallelism = runtime .NumCPU () * 10
103+ }
104+ // Initialize range channel with entire namespace of object for given
105+ // prefix, startoffset and endoffset. For the default range to list is
106+ // entire namespace, start and end will be empty.
107+ rangeChannel := make (chan * listRange , in .Parallelism * 2 )
108+ start , end := updateStartEndOffset (in .Query .StartOffset , in .Query .EndOffset , in .Query .Prefix )
109+ rangeChannel <- & listRange {startRange : start , endRange : end }
110+
84111 lister := & Lister {
85112 method : open ,
113+ parallelism : in .Parallelism ,
86114 pageToken : "" ,
87115 bucket : bucket ,
88116 batchSize : in .BatchSize ,
89117 query : in .Query ,
90118 skipDirectoryObjects : in .SkipDirectoryObjects ,
119+ ranges : rangeChannel ,
91120 }
92121 return lister
93122}
@@ -102,13 +131,15 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
102131 var results []* storage.ObjectAttrs
103132 ctx , cancel := context .WithCancel (ctx )
104133 defer cancel ()
105- // Errgroup takes care of running both methods in parallel. As soon as one of the method
106- // is complete, the running method also stops.
134+ // Errgroup takes care of running both methods in parallel. As soon as one of
135+ // the method is complete, the running method also stops.
107136 g , childCtx := errgroup .WithContext (ctx )
108137
109- // To start listing method is Open and runs both worksteal and sequential listing in parallel.
110- // The method which completes first is used for all subsequent runs.
138+ // To start listing method is Open and runs both worksteal and sequential listing
139+ // in parallel. The method which completes first is used for all subsequent runs.
140+
111141 // TODO: Run worksteal listing when method is Open or WorkSteal.
142+
112143 // Run sequential listing when method is Open or Sequential.
113144 if c .method != worksteal {
114145
@@ -118,8 +149,8 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
118149 countError ++
119150 return fmt .Errorf ("error in running sequential listing: %w" , err )
120151 }
121- // If sequential listing completes first, set method to sequential listing and ranges to nil.
122- // The nextToken will be used to continue sequential listing.
152+ // If sequential listing completes first, set method to sequential listing
153+ // and ranges to nil. The nextToken will be used to continue sequential listing.
123154 results = objects
124155 c .pageToken = nextToken
125156 c .method = sequential
@@ -135,13 +166,15 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
135166 // If the error is not context.Canceled, then return error instead of falling back
136167 // to the other method. This is so that the error can be fixed and user can take
137168 // advantage of fast-listing.
138- // As one of the listing method completes, it is expected to cancel context for the other method.
139- // If both sequential and worksteal listing fail due to context canceled, only then return error.
169+ // As one of the listing method completes, it is expected to cancel context for the
170+ // only then return error. other method. If both sequential and worksteal listing
171+ // fail due to context canceled, return error.
140172 if err != nil && (! errors .Is (err , context .Canceled ) || countError > 1 ) {
141173 return nil , fmt .Errorf ("failed waiting for sequntial and work steal lister : %w" , err )
142174 }
143175
144- // If ranges for worksteal and pageToken for sequential listing is empty, then listing is complete.
176+ // If ranges for worksteal and pageToken for sequential listing is empty, then
177+ // listing is complete.
145178 if c .pageToken == "" {
146179 return results , iterator .Done
147180 }
@@ -150,6 +183,49 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
150183
151184// Close closes the range channel of the Lister.
152185func (c * Lister ) Close () {
186+ if c .ranges != nil {
187+ close (c .ranges )
188+ }
189+ }
153190
154- // TODO: Close range channel for worksteal lister.
191+ // updateStartEndOffset updates start and end offset based on prefix.
192+ // If a prefix is given, adjust start and end value such that it lists
193+ // objects with the given prefix. updateStartEndOffset assumes prefix will
194+ // be added to the object name while listing objects in worksteal algorithm.
195+ //
196+ // For example:
197+ // start = "abc", end = "prefix_a", prefix = "prefix",
198+ //
199+ // end will change to "_a", prefix will be added in worksteal algorithm.
200+ // "abc" is lexicographically smaller than "prefix". So start will be the first
201+ // object with the given prefix.
202+ //
203+ // Therefore start will change to ""(empty string) and end to "_a" .
204+ func updateStartEndOffset (start , end , prefix string ) (string , string ) {
205+ if prefix == "" {
206+ return start , end
207+ }
208+ if start != "" && end != "" && start >= end {
209+ return start , start
210+ }
211+ if start != "" {
212+ if start <= prefix {
213+ start = ""
214+ } else if strings .HasPrefix (start , prefix ) {
215+ start = start [len (prefix ):]
216+ } else {
217+ return start , start
218+ }
219+ }
220+
221+ if end != "" {
222+ if len (end ) > len (prefix ) && strings .HasPrefix (end , prefix ) {
223+ end = end [len (prefix ):]
224+ } else if end > prefix {
225+ end = ""
226+ } else {
227+ return end , end
228+ }
229+ }
230+ return start , end
155231}
0 commit comments