Handling Shopify API limits and Goroutines
I frequently build integration applications with Shopify and I thought it would be helpful to highlight one approach (of many) that I commonly use for handling API rate limits with Shopify’s GraphQL API using Go. The integration in question was originally developed and maintained by a third-party provider. However, after their company was acquired, they decided to shut down their service and stop all servicing of the integration. Now it would be needed to be rebuilt from scratch.
The original integration worked by accepting a standardized CSV file from a third-party service through a webhook. The CSV file contained a list of SKUs and their current inventory levels, which needed to be synced with the Shopify store. This same CSV format was used by the service for multiple merchants at once and typically contained between 3,000–3,500 rows. Not every SKU in the CSV necessarily existed in the store, so that had to be accounted for during processing.
In summary, the rebuild of the integration had to handle the following:
- Read each line from the CSV
- Check if the SKU exists in the store
- If it does, update its inventory to the specified value
Format example of the CSV:
ITEM_CODE,QTY
00288,5
22991,1
23211,18
...
Additionally, Shopify’s GraphQL API limits would obviously be high priority to factor in. In the case for this specific Shopify store, the limits were:
- 2000 available points
- 100 points refilled per second
Each row in the CSV required one to two GraphQL calls to Shopify, depending on whether the SKU existed in the store. A query was needed to fetch the SKU, and if it was found, a mutation followed which would update the inventory.
By analyzing the query and mutation costs in Shopify’s GraphQL app, it was determined that each query consumed 2 points, while each mutation consumed 10 points. This meant, in the worst case senario, where every SKU existed, a total of 12 points would be consumed per row.
My goal was to maximize the number of concurrent queries and mutations without blowing through the available points too quickly. At the same time, maintain a safeguard to throttle requests if the point threshold was reached, allowing time for points to refill and ensuring the updates could continue without having a bad request returned by Shopify.
With the point consumption determined, I previously developed a script to simulate depleating the available points, which I reused for this project. I modified the simulation script to depleat the available points by 12, multiplied by a different number concurrent running updates, while also increasing the available points at a rate of 100 points per second. This simulation script also kept track of the number of requests happening per second.
The result of this simulation with a different number of potential concurrent running updates, I came to the conclusion that between 15-20 concurrent jobs would be a safe balance; potientially draining 180 points per second or more and refilling at a rate of 100 per second, would result in an average net decrease of 80 points per second from the available points. Given a safeguard would be in place to handle potentially hitting the threshold of available points, this was a great balance to continue with.
Now, there are several methods a developer can take to craft a solution preventing draining the available points down to zero… some developers may call each job in sequence, some developers may call each job with a sleep in between, some developers may run it as a batch of jobs in sequence, some developers may utilize a worker pool system with a set capacity, etc.
For me, I decided to develop a semaphore approach. If you’re not familiar with that, a semaphore is essentially a concurrency control method to maintain a set capacity of “how many” of something is permitted to run at a time. A process would first “aquire” a spot and when completed it’s work, it would “release” the spot, so another process can aquire it.
Since there would be between 3,000-3,500 rows in the CSV (on average), I decided to skip a worker pool setup and simply spin up each row of the CSV as a Goroutine, where each Goroutine would attempt to aquire a spot with the semaphore control and upon release of that spot, we would check the remaining available points and handle accordingly.
Additionally, the capacity of this semaphore would be set to the 15-20 limit previously determined from the simulated script. If the remaining available points dipped below a set threshold, the release mechanism would cause a “pause” in releasing, calculating the time it would take to refill the available points back to maximum, then resuming the release. This would allow 15-20 concurrent jobs to be running at a time, while the release mechanism acted as the safe guard to ensure the available points were not totally drained.
Example code is below.
package shopifysemaphore
import (
"sync/atomic"
"time"
)
// ErrPts is the points value to pass in if a network or other error happens.
// Essentially to be used for situations where no response containing point
// information was returned. This is used to know if the Update method should
// actually update the remaining point balance or not.
var ErrPts int32 = -1
// Balance represents the information of point values and keeps track of
// items such as the remaining points, threshold, limit, and refill rate.
type Balance struct {
.Int32 // Point balance remaining.
Remaining atomicint32 // Minimum point balance where we would consider handling with a "pause".
Threshold int32 // Maximum points available.
Limit int32 // Number of points refilled per second.
RefillRate }
// NewBalance accepts a threshold (thld) point balance, a maximum (max) point
// balance, and the refill rate (rr). It will return a pointer to Balance.
func NewBalance(thld int32, max int32, rr int32) *Balance {
:= &Balance{
b : thld,
Threshold: max,
Limit: rr,
RefillRate}
.Update(max)
breturn b
}
// Update accepts a new value of remaining points to store.
func (b *Balance) Update(points int32) {
if points > ErrPts {
.Remaining.Store(points)
b}
}
// RefillDuration accounts for the remaining points, the limit, and the refill rate to
// determine how many seconds it would take to refill to remaining points back to full.
// It will return a duration which can be used to "pause" operations.
func (b *Balance) RefillDuration() time.Duration {
return time.Duration((b.Limit-b.Remaining.Load())/b.RefillRate) * time.Second
}
// AtThreshold will return a boolean if we have reached or surpassed the set
// threshold of remaining points or not.
func (b *Balance) AtThreshold() bool {
return b.Remaining.Load() <= b.Threshold
}
package shopifysemaphore
import (
"context"
"sync"
"time"
)
var (
= 200 * time.Millisecond // Default aquire throttle duration.
DefaultAquireBuffer = 1 * time.Second // Default pause buffer to append to pause duration calculation.
DefaultPauseBuffer )
// Semaphore is responsible regulating when to pause and resume processing of Goroutines.
// Points remaining, point thresholds, and point refill rates are taken into
// consideration. If remaining points go below the threshold, a pause is initiated
// which will also calculate how long a pause should happen based on the refill rate.
// Once pause is completed, the processing will resume. A PauceFunc and ResumeFunc
// can optionally be passed in which will fire respectively when a pause happens
// and when a resume happens.
type Semaphore struct {
*Balance // Point information and tracking.
func(int32, time.Duration) // Optional callback for when pause happens.
PauseFunc func() // Optional callback for when resume happens.
ResumeFunc .Duration // Buffer of time to wait before attempting to re-aquire a spot.
PauseBuffer time.Duration // Buffer of time to extend the pause with.
AquireBuffer time
.Time // When paused last happened.
pausedAt timechan struct{} // Semaphore for controlling the number of Goroutines running.
sema
.Mutex // For handling paused flag control.
mu syncbool // Pause flag.
paused }
// NewSemaphore returns a pointer to Semaphore. It accepts a cap which represents the
// capacity of how many Goroutines can run at a time, it also accepts information
// about the point balance and lastly, optional parameters.
func NewSemaphore(cap int, b *Balance, opts ...func(*Semaphore)) *Semaphore {
:= &Semaphore{
sem : b,
Balance: make(chan struct{}, cap),
sema}
for _, opt := range opts {
(sem)
opt}
if sem.PauseFunc == nil {
// Provide default PauseFunc.
(func(_ int32, _ time.Duration) {})(sem)
WithPauseFunc}
if sem.ResumeFunc == nil {
// Provide default ResumeFunc.
(func() {})(sem)
WithResumeFunc}
if sem.AquireBuffer == 0 {
(DefaultAquireBuffer)(sem)
WithAquireBuffer}
return sem
}
// Aquire will attempt to aquire a spot to run the Goroutine.
// It will continue in a loop until it does aquire also pausing
// if the pause flag has been enabled. Aquiring is throttled at
// the value of AquireBuffer.
func (sem *Semaphore) Aquire(ctx context.Context) (err error) {
for aquired := false; !aquired; {
for {
if !sem.paused {
// Not paused, break loop and attempt to aquire a spot.
break
}
}
// Attempt to aquire a spot, if not we will throttle the next loop.
select {
case <-ctx.Done():
// Context done/cancelled, break loop, and return the error.
= true
aquired = ctx.Err()
err case sem.sema <- struct{}{}:
// Spot aquired, break loop.
= true
aquired default:
// Can not yet aquire a spot, throttle for a set duration.
.Sleep(sem.AquireBuffer)
time}
}
return
}
// Release will release a spot for another Goroutine to take.
// It accepts a current value of remaining point balance, to which the
// remaining point balance will only be updated if the count is greater than -1.
// If the remaining points is below the set threshold, a pause will be
// initiated and a duration of this pause will be calculated based
// upon several factors surrouding the point information such as limit,
// threshold, and the refull rate.
func (sem *Semaphore) Release(pts int32) {
defer sem.mu.Unlock()
.mu.Lock()
sem
.Update(pts)
semif sem.AtThreshold() {
// Calculate the duration required to refill and that duration time
// has passed before we call for a pause.
:= sem.RefillDuration() + sem.PauseBuffer
ra if sem.pausedAt.Add(ra).Before(time.Now()) {
.paused = true
sem.pausedAt = time.Now()
semgo sem.PauseFunc(pts, ra)
// Unflag as paused after the determined duration and run the ResumeFunc.
go func() {
.Sleep(ra)
time.paused = false
sem.ResumeFunc()
sem}()
}
}
// Perform the actual release.
<-sem.sema
}
// withPauseFunc is a functional option for Semaphore to call when
// a pause happens. The point balance remaining and the duration of
// the pause will passed into the function.
func WithPauseFunc(fn func(int32, time.Duration)) func(*Semaphore) {
return func(sem *Semaphore) {
.PauseFunc = fn
sem}
}
// withResumeFunc is a functional option for Semaphore to call when
// resume from a pause happens.
func WithResumeFunc(fn func()) func(*Semaphore) {
return func(sem *Semaphore) {
.ResumeFunc = fn
sem}
}
// WithAquireBuffer is a functional option for Semaphore which
// will set the throttle duration for attempting to re-aquire a spot.
func WithAquireBuffer(dur time.Duration) func(*Semaphore) {
return func(sem *Semaphore) {
.AquireBuffer = dur
sem}
}
// WithPauseBuffer is a functional option for Semaphore which
// will set an additional duration to append to the pause duration.
func WithPauseBuffer(dur time.Duration) func(*Semaphore) {
return func(sem *Semaphore) {
.PauseBuffer = dur
sem}
}
Example usage:
package main
import (
"log"
"github.com/gniktr/shopifysemaphore"
ssem )
func work(id int, wg *sync.WaitGroup, ctx context.Context, sem *ssem.Semaphore) {
:= sem.Aquire(ctx)
err if err != nil {
// Possible context timeout.
.Printf("work: %w\n", err)
log.Done()
wgreturn
}
// Return remaining points from call.
, err := graphQLCall()
pointsif err != nil {
.Printf("work: %w\n", err)
log
// If error is a network error or bad request for example, essentially
// any error which would cause the response to *not* return point information,
// then you should set the points to ErrPts to not trigger a point
// update in Balance.
:= ssem.ErrPts
points }
.Printf("remaining: %d points\n", points)
log
.Done()
wg.Release(points)
sem}
func main() {
.Println("started!")
log:= make(chan bool)
done , cancel := context.WithTimeout(context.Background(), 5 * time.Minute)
ctx
// Semaphore with a concurrent capacity of 10.
// Including a point balance setup with a threshold to pause at 200 points,
// a maximum of 2000 points available, and a refill rate of 100 points per second.
:= ssem.NewSemaphore(
sem 10,
.NewBalance(200, 2000, 100),
ssem.WithPauseFunc(func (pts int32, dur time.Duration) {
ssem.Printf("pausing for %s due to remaining points of %d...\n", dur, pts)
log}),
.WithResumeFunc(func () {
ssem.Println("resuming...")
log})
)
// Run 100 Goroutines.
var wg sync.WaitGroup
for i := 0; i < 100; i += 1 {
.Add(1)
wggo work(i, &wg, ctx, sem)
}
// Wait for completion of Goroutines.
go func() {
.Wait()
wg<- true
done }()
select {
case <-ctx.Done():
.Println("timeout happened.")
logcase <-done:
.Println("work finished.")
log}
.Println("completed.")
log}
Example output:
started!
remaining: 1840 points
remaining: 1710 points
remaining: 1660 points
...
remaining: 280 points
remaining: 190 points
pausing for 18 seconds due to remaining points of 190...
resuming...
remaining: 1890 points
remaining: 1810 points
...
work finished.
completed.
Using the example semaphore method above, we are allowing 10 Goroutines to run concurrently out of the 1000 and upon each Goroutine’s completion, the Goroutine will report the remaining points back to the release mechanism, which will determine if a pause is needed before actually issuing the release.
In the context of the integration application project, it was a success! The previously mentioned 3,000-3,500 inventory updates were able to process within 3.5-4.5 minutes without hitting the threshold often.
Hopefully this helpful to those looking to do similar. I have released this method as a Go package, which you can find here on Github.