File: ./doc.go
   1 /*
   2 # syncplus
   3 
   4 Package implementing common concurrency patterns the standard library doesn't.
   5 
   6 interface Doer
   7 
   8 Doer is an interface to allow using sync.Once and the results of syncplus.UpTo
   9 interchangeably. All it does is require a `Do` method, which takes a 0-input
  10 func with no return values.
  11 
  12 func UpTo
  13 
  14 UpTo lets you create a generalization of sync.Once, specifically a value with
  15 a `Do` method which has an effect up to n times, instead of just one.
  16 
  17 interface Waiter
  18 
  19 Waiter is a purposeful downgrade of sync.WaitGroup's functionality, so you can
  20 only Wait on something but not affect the blocker's bookkeeping in any way,
  21 thus guaranteeing its safe use from the calling side, which receives a Waiter
  22 from an API.
  23 
  24 func NewWaitOnlyGroup
  25 
  26 NewWaitOnlyGroup creates a Waiter, which unlike sync.WaitGroup doesn't let you
  27 change it yourself.
  28 
  29 type Context
  30 
  31 Context is just an alias for context.Context from the stdlib.
  32 
  33 type LoopLimiter
  34 
  35 LoopLimiter lets you configure various rate-limited concurrent funcs from this
  36 package, allowing a max-concurrency limit, as well as a context callbacks can
  37 use to stop current and further concurrent dispatching.
  38 
  39 Both fields are optional and are auto-replaced by sensible defaults: the number
  40 of cores for MaxConcurrency, and the background context for Context.
  41 
  42 func LoopIntegers(LoopLimiter, first, last int, f func(syncplus.Context, int64)) Waiter
  43 
  44 LoopIntegers dispatches asynchronous processing over an inclusive for-loop over
  45 int64 values, limiting how many tasks are running at any time; backward loops
  46 aren't supported yet.
  47 
  48 func LoopSlice[V any](LoopLimiter, []V, func(syncplus.Context, int64, V)) Waiter
  49 
  50 LoopSlice dispatches asynchronous processing over values from the slice given,
  51 limiting how many tasks are running at any time.
  52 
  53 func LoopMap[K comparable, V any](LoopLimiter, map[K]V, func(syncplus.Context, K, V)) Waiter
  54 
  55 LoopMap dispatches asynchronous processing over key-value pairs from the map
  56 given, limiting how many tasks are running at any time.
  57 */
  58 package syncplus

     File: ./go.mod
   1 module syncplus
   2 
   3 go 1.18

     File: ./limiters.go
   1 package syncplus
   2 
   3 import (
   4     "context"
   5     "runtime"
   6     "sync"
   7 )
   8 
   9 // Context is just an alias for context.Context from the stdlib.
  10 // type Context = context.Context
  11 
  12 // Doer is a generic function caller. Type sync.Once from the stdlib is a
  13 // Doer, for example, as are the results of func syncplus.UpTo.
  14 type Doer interface {
  15     Do(f func())
  16 }
  17 
  18 // UpTo lets you call funcs up to the number of times given, via the result's
  19 // Do method; after that, the Do method will refuse to call the func given.
  20 //
  21 // This func's result generalizes what sync.Once from the stdlib does: both
  22 // types implement interface syncplus.Doer, with its Do method.
  23 //
  24 // When this func is given less than 1 as its value, the result will never do
  25 // anything when its method Do is called.
  26 //
  27 // # Example
  28 //
  29 //  doer := syncplus.UpTo(n)
  30 //  ...
  31 //  doer.Do(func () {
  32 //      ...
  33 //  })
  34 func UpTo(times int) Doer {
  35     return &limitedDoer{Times: times}
  36 }
  37 
  38 // limitedDoer generalizes sync.Once, allowing func `Do` to work up to n times
  39 type limitedDoer struct {
  40     // Times limits how many times func Do runs the callbacks given to it
  41     Times int
  42 
  43     // mut serializes runs of func Do
  44     mut sync.Mutex
  45 }
  46 
  47 // Do calls the func given to it, guaranteeing the mutual-exclusion of separate
  48 // calls to it, as sync.Once does, and implements interface syncplus.Doer.
  49 func (ld *limitedDoer) Do(f func()) {
  50     ld.mut.Lock()
  51     defer ld.mut.Unlock()
  52 
  53     if ld.Times <= 0 {
  54         // calls are up, so do nothing
  55         return
  56     }
  57 
  58     ld.Times--
  59     if f != nil {
  60         f()
  61     }
  62 }
  63 
  64 // LoopLimiter lets you configure various rate-limited concurrent funcs from
  65 // this package, allowing a max-concurrency limit, as well as a context
  66 // callbacks can use to stop current and further concurrent dispatching.
  67 //
  68 // Both fields are optional and are auto-replaced by sensible defaults: the
  69 // number of cores for MaxConcurrency, and the background context for Context.
  70 type LoopLimiter struct {
  71     MaxConcurrency int
  72     Context        context.Context
  73 }
  74 
  75 // NewLoopLimiter is the main constructor for type LoopLimiter, and returns
  76 // values which are always valid and ready to use.
  77 func NewLoopLimiter(max int, ctx context.Context) LoopLimiter {
  78     var ll LoopLimiter
  79     ll.MaxConcurrency = max
  80     ll.Context = ctx
  81     ll.fix()
  82     return ll
  83 }
  84 
  85 // fix ensures a LoopLimiter's settings are valid
  86 func (ll *LoopLimiter) fix() {
  87     if ll.MaxConcurrency < 1 {
  88         ll.MaxConcurrency = runtime.NumCPU()
  89     }
  90     if ll.Context == nil {
  91         ll.Context = context.Background()
  92     }
  93 }
  94 
  95 // LoopInt64 runs an integer for-loop on goroutines, while also rate-limiting
  96 // them: this means you limit how many things are allowed to happen at the same
  97 // time at most.
  98 //
  99 // Both limit values are inclusive. Backward loops aren't supported when the
 100 // first number is bigger than the second. A concurrency parameter less than 1
 101 // means use the number of cores on the machine.
 102 //
 103 // Each callback will run inside its own goroutine: this means completion-order
 104 // of the callbacks isn't guaranteed. All that's guaranteed is asynchrony.
 105 //
 106 // You can use the returned value's Wait method to block until all callbacks
 107 // have finished, but that's optional. If you just ignore that, the dispatching
 108 // also happens in a goroutine, giving you full asynchrony if you want so.
 109 //
 110 // # Example
 111 //
 112 //  // to save memory and avoid timeouts related to resource-exhaustion,
 113 //  // ensure only up to 50 ports will be checked at once
 114 //  ll := syncplus.LoopLimiter{MaxConcurrency: 50}
 115 //
 116 //  // scan all TCP ports, which range from 1 to 65,535; since doing it in a
 117 //  // simple for-loop would take a while, concurrent scanning of different
 118 //  // ports speeds things up a lot
 119 //  w := syncplus.LoopInt64(ll, 1, 65_535, func(ctx context.Context, port int64) {
 120 //      res, err := testConnection("tcp", int(port))
 121 //      if err != nil {
 122 //          log.Println(err.Error())
 123 //          return
 124 //      }
 125 //      ...
 126 //  })
 127 //
 128 //  // wait until all the rate-limited async callbacks are done
 129 //  w.Wait()
 130 func LoopInt64(ll LoopLimiter, first, last int64, f func(ctx context.Context, n int64)) Waiter {
 131     if first <= last {
 132         return loopIntForward(ll, first, last, f)
 133     }
 134     // return loopIntBackward(ll, first, last, f)
 135     return NoWait{}
 136 }
 137 
 138 // loopIntForward handles forward loops for func LoopInt64
 139 func loopIntForward(ll LoopLimiter, first, last int64, f func(ctx context.Context, n int64)) Waiter {
 140     if first > last {
 141         // don't bother setting-up any dispatching for empty loops
 142         return NoWait{}
 143     }
 144 
 145     ll.fix()
 146     stopped := ll.Context.Done()
 147 
 148     var waiter sync.WaitGroup
 149     // make the waiter return actually wait then its func Wait is called
 150     waiter.Add(1)
 151 
 152     // launch async tasks asynchronously, so the caller gets its waiter
 153     // right away as this func returns normally
 154     go func() {
 155         // signal the func's caller that everything is done
 156         defer waiter.Done()
 157 
 158         // handle serial case more efficiently, by avoiding wait-groups
 159         if ll.MaxConcurrency == 1 {
 160             for n := first; n <= last; n++ {
 161                 select {
 162                 case <-stopped:
 163                     // return right away
 164                     return
 165 
 166                 default:
 167                     f(ll.Context, n)
 168                 }
 169             }
 170 
 171             // avoid looping twice
 172             return
 173         }
 174 
 175         // handle general case, with more than 1 task allowed at once
 176         for n := first; n <= last; {
 177             size := int64(ll.MaxConcurrency)
 178             if left := last - n + 1; left < int64(ll.MaxConcurrency) {
 179                 size = left
 180             }
 181 
 182             // prepare to wait for all tasks from the current batch
 183             var wg sync.WaitGroup
 184             wg.Add(int(size))
 185 
 186             for j := int64(0); j < size; j++ {
 187                 select {
 188                 case <-stopped:
 189                     // return right away, without starting any more tasks
 190                     return
 191 
 192                 default:
 193                     go func(n int64) {
 194                         defer wg.Done()
 195                         f(ll.Context, n)
 196                     }(int64(n))
 197                     n++
 198                 }
 199             }
 200 
 201             // wait for all tasks from the current batch
 202             wg.Wait()
 203         }
 204     }()
 205 
 206     // give caller the option to wait until all is done
 207     return NewWaitOnlyGroup(&waiter)
 208 }
 209 
 210 // LoopSlice uses the loop-limiter given to dispatch up to n instances of a
 211 // callback on the given slice's items.
 212 func LoopSlice[V any](ll LoopLimiter, arr []V, f func(ctx context.Context, i int64, v V)) Waiter {
 213     if len(arr) == 0 {
 214         // don't bother setting-up any dispatching for empty slices
 215         return NoWait{}
 216     }
 217 
 218     ll.fix()
 219     stopped := ll.Context.Done()
 220     var waiter sync.WaitGroup
 221     // make the waiter return actually wait then its func Wait is called
 222     waiter.Add(1)
 223 
 224     // launch async tasks asynchronously, so the caller gets its waiter
 225     // right away as this func returns normally
 226     go func() {
 227         // signal the func's caller that everything is done
 228         defer waiter.Done()
 229 
 230         // handle serial case more efficiently, by avoiding wait-groups
 231         if ll.MaxConcurrency == 1 {
 232             for i, v := range arr {
 233                 select {
 234                 case <-stopped:
 235                     // return right away
 236                     return
 237 
 238                 default:
 239                     f(ll.Context, int64(i), v)
 240                 }
 241             }
 242 
 243             // avoid looping twice
 244             return
 245         }
 246 
 247         // handle general case, with more than 1 task allowed at once
 248 
 249         // handle general case, with more than 1 task allowed at once
 250         for index := 0; len(arr) > 0; {
 251             size := int64(ll.MaxConcurrency)
 252             if len(arr) < ll.MaxConcurrency {
 253                 size = int64(len(arr))
 254             }
 255 
 256             // prepare to wait for all tasks from the current batch
 257             var wg sync.WaitGroup
 258             wg.Add(int(size))
 259 
 260             for j := int64(0); j < size; j++ {
 261                 select {
 262                 case <-stopped:
 263                     // return right away, without starting any more tasks
 264                     return
 265 
 266                 default:
 267                     go func(i int64, v V) {
 268                         defer wg.Done()
 269                         f(ll.Context, i, v)
 270                     }(int64(index), arr[0])
 271                     arr = arr[1:]
 272                     index++
 273                 }
 274             }
 275 
 276             // wait for all tasks from the current batch
 277             wg.Wait()
 278         }
 279     }()
 280 
 281     // give caller the option to wait until all is done
 282     return NewWaitOnlyGroup(&waiter)
 283 }
 284 
 285 // LoopMap uses the loop-limiter given to dispatch up to n instances of a
 286 // callback on the given map's key-value pairs.
 287 func LoopMap[K comparable, V any](ll LoopLimiter, kv map[K]V, f func(ctx context.Context, k K, v V)) Waiter {
 288     if len(kv) == 0 {
 289         // don't bother setting-up any dispatching for empty slices
 290         return NoWait{}
 291     }
 292 
 293     ll.fix()
 294     stopped := ll.Context.Done()
 295     var waiter sync.WaitGroup
 296     // make the waiter return actually wait then its func Wait is called
 297     waiter.Add(1)
 298 
 299     // launch async tasks asynchronously, so the caller gets its waiter
 300     // right away as this func returns normally
 301     go func() {
 302         // signal the func's caller that everything is done
 303         defer waiter.Done()
 304 
 305         // handle serial case more efficiently, by avoiding wait-groups
 306         if ll.MaxConcurrency == 1 {
 307             for k, v := range kv {
 308                 select {
 309                 case <-stopped:
 310                     // return right away
 311                     return
 312 
 313                 default:
 314                     f(ll.Context, k, v)
 315                 }
 316             }
 317 
 318             // avoid looping twice
 319             return
 320         }
 321 
 322         // itemsLeft keeps track of when rate-limiting logic isn't needed
 323         // anymore
 324         itemsLeft := len(kv)
 325 
 326         // handle general case, with more than 1 task allowed at once
 327         for k, v := range kv {
 328             size := int64(ll.MaxConcurrency)
 329             if itemsLeft < ll.MaxConcurrency {
 330                 size = int64(itemsLeft)
 331             }
 332 
 333             // prepare to wait for all tasks from the current batch
 334             var wg sync.WaitGroup
 335             wg.Add(int(size))
 336 
 337             for j := int64(0); j < size; j++ {
 338                 select {
 339                 case <-stopped:
 340                     // return right away, without starting any more tasks
 341                     return
 342 
 343                 default:
 344                     go func(k K, v V) {
 345                         defer wg.Done()
 346                         f(ll.Context, k, v)
 347                     }(k, v)
 348                     itemsLeft--
 349                 }
 350             }
 351 
 352             // wait for all tasks from the current batch
 353             wg.Wait()
 354         }
 355     }()
 356 
 357     // give caller the option to wait until all is done
 358     return NewWaitOnlyGroup(&waiter)
 359 }
 360 
 361 // LoopChannel uses the loop-limiter given to dispatch up to n instances of a
 362 // callback on the given channel values.
 363 func LoopChannel[V any](ll LoopLimiter, src <-chan V, f func(ctx context.Context, v V)) Waiter {
 364     ll.fix()
 365     var waiter sync.WaitGroup
 366     waiter.Add(1)
 367 
 368     go func() {
 369         ctx := ll.Context
 370         done := ctx.Done()
 371         defer waiter.Done()
 372 
 373         // turn is a concurrency-throttler, that's why it uses empty/no data
 374         turn := make(chan struct{}, ll.MaxConcurrency)
 375         defer close(turn)
 376 
 377         for v := range src {
 378             select {
 379             case <-done:
 380                 // quit right away, when told to by the oustide
 381                 return
 382 
 383             default:
 384                 // wait for the (next) callback's turn
 385                 turn <- struct{}{}
 386 
 387                 go func(v V) {
 388                     // end the callback's turn when it's over
 389                     defer func() { <-turn }()
 390                     f(ctx, v)
 391                 }(v)
 392             }
 393         }
 394     }()
 395 
 396     // give caller the option to wait until all is done
 397     return NewWaitOnlyGroup(&waiter)
 398 }

     File: ./limiters_test.go
   1 package syncplus
   2 
   3 import (
   4     "context"
   5     "sync"
   6     "testing"
   7 )
   8 
   9 func TestLoopInt64(t *testing.T) {
  10     tests := []struct {
  11         Name        string
  12         First       int64
  13         Last        int64
  14         Concurrency int
  15     }{
  16         {`forward loop`, -3, 54, -1},
  17         {`single item`, -3, -3, -1},
  18         {`empty loop`, 8, 2, -1},
  19     }
  20 
  21     for _, tc := range tests {
  22         t.Run(tc.Name, func(t *testing.T) {
  23             var mutex sync.Mutex
  24             counts := make(map[int64]int)
  25             ll := LoopLimiter{MaxConcurrency: tc.Concurrency}
  26 
  27             LoopInt64(ll, tc.First, tc.Last, func(ctx context.Context, n int64) {
  28                 mutex.Lock()
  29                 defer mutex.Unlock()
  30                 counts[n]++
  31             }).Wait()
  32 
  33             n := tc.Last - tc.First + 1
  34             // backward loops aren't supported, at least for now
  35             if n < 0 {
  36                 n = 0
  37             }
  38 
  39             ncounts := int64(len(counts))
  40             if ncounts != n {
  41                 t.Fatalf(`got %d items instead of %d`, len(counts), n)
  42                 return
  43             }
  44 
  45             for i, c := range counts {
  46                 if c != 1 {
  47                     const fs = `number %d dispatched not once, but %d times`
  48                     t.Fatalf(fs, i, c)
  49                     return
  50                 }
  51             }
  52         })
  53     }
  54 }

     File: ./waiters.go
   1 package syncplus
   2 
   3 import "sync"
   4 
   5 // Waiter is an interface representing any kind of control-flow blocker, such
   6 // as sync.WaitGroup from the standard library. Type NoWait, and the results
   7 // of func NewWaitOnlyGroup also implement it.
   8 type Waiter interface {
   9     Wait()
  10 }
  11 
  12 // waitOnlyGroup hides a sync.WaitGroup pointer so you can only Wait on it and
  13 // not change its state in any way (via its Add and Done methods), making it
  14 // safer to use from the caller side. Also, you can use nil pointers safely.
  15 type waitOnlyGroup struct {
  16     wg *sync.WaitGroup
  17 }
  18 
  19 // NewWaitOnlyGroup wraps a sync.WaitGroup into a Waiter with only a `Wait`
  20 // method: funcs returning such values can prevent callers from tampering
  21 // with internal WaitGroup logic, contributing to more reliable APIs.
  22 //
  23 // Using nil pointers is safe, but using type NoWait may be preferable to
  24 // achieve the same fake-wait effect.
  25 //
  26 // # Examples
  27 //
  28 // nowait := NewWaitOnlyGroup(nil)
  29 // ...
  30 // nowait.Wait()
  31 //
  32 // // ----------
  33 //
  34 // var wg sync.WaitGroup
  35 // wog := NewWaitOnlyGroup(&wg)
  36 // ...
  37 // wog.Wait()
  38 func NewWaitOnlyGroup(wg *sync.WaitGroup) Waiter {
  39     return waitOnlyGroup{wg}
  40 }
  41 
  42 // Wait blocks on an internal sync.WaitGroup pointer, does nothing when
  43 // that pointer is nil, and implements the syncplus.Waiter interface.
  44 func (wog waitOnlyGroup) Wait() {
  45     if wg := wog.wg; wg != nil {
  46         wg.Wait()
  47     }
  48 }
  49 
  50 // NoWait implements the Waiter interface without actually blocking: you can
  51 // return this in your own Waiter-returning funcs to handle 0-item scenarios.
  52 type NoWait struct{}
  53 
  54 // Wait returns immediately, and implements the syncplus.Waiter interface.
  55 func (nw NoWait) Wait() {}

     File: ./waiters_test.go
   1 package syncplus
   2 
   3 import (
   4     "sync"
   5     "testing"
   6     "time"
   7 )
   8 
   9 func TestWaitOnlyGroup(t *testing.T) {
  10     var wg sync.WaitGroup
  11     var waiter Waiter = NewWaitOnlyGroup(&wg)
  12 
  13     wg.Add(1)
  14     go func() {
  15         time.Sleep(10 * time.Millisecond)
  16         wg.Done()
  17     }()
  18     waiter.Wait()
  19 }