...

Source file src/github.com/growthbook/growthbook-golang/feature_repository.go

Documentation: github.com/growthbook/growthbook-golang

     1  package growthbook
     2  
     3  import (
     4  	"encoding/json"
     5  	"io/ioutil"
     6  	"math"
     7  	"math/rand"
     8  	"net/http"
     9  	"sync"
    10  	"time"
    11  
    12  	"github.com/ian-ross/sse/v2"
    13  )
    14  
    15  // Alias for names of repositories. Used as key type in various maps.
    16  // The key for a given repository is of the form
    17  // "<apiHost>||<clientKey>".
    18  
    19  type RepositoryKey string
    20  
    21  // Interface for feature caching.
    22  
    23  type Cache interface {
    24  	Initialize()
    25  	Clear()
    26  	Get(key RepositoryKey) *CacheEntry
    27  	Set(key RepositoryKey, entry *CacheEntry)
    28  }
    29  
    30  // Cache entry type for feature cache.
    31  
    32  type CacheEntry struct {
    33  	Data    *FeatureAPIResponse `json:"data"`
    34  	Version time.Time           `json:"version"`
    35  	StaleAt time.Time           `json:"stale_at"`
    36  }
    37  
    38  // Set feature cache. Passing nil uses the default in-memory cache.
    39  
    40  func ConfigureCache(c Cache) {
    41  	if c == nil {
    42  		c = &repoCache{}
    43  	}
    44  	cache.Clear()
    45  	cache = c
    46  }
    47  
    48  // ConfigureCacheBackgroundSync enables or disables background cache
    49  // synchronization.
    50  
    51  func ConfigureCacheBackgroundSync(bgSync bool) {
    52  	cacheBackgroundSync = bgSync
    53  	if !bgSync {
    54  		clearAutoRefresh()
    55  	}
    56  }
    57  
    58  // -----------------------------------------------------------------------------
    59  //
    60  //  PRIVATE FUNCTIONS START HERE
    61  
    62  // repoRefreshFeatures fetches features from the GrowthBook API and
    63  // updates the calling GrowthBook instances as required.
    64  
    65  func repoRefreshFeatures(gb *GrowthBook, timeout time.Duration,
    66  	skipCache bool, allowStale bool, updateInstance bool) {
    67  	data := fetchFeaturesWithCache(gb, timeout, allowStale, skipCache)
    68  	if updateInstance && data != nil {
    69  		refreshInstance(gb.inner, data)
    70  	}
    71  }
    72  
    73  func repoLatestUpdate(gb *GrowthBook) *time.Time {
    74  	key := getKey(gb)
    75  	existing := cache.Get(key)
    76  	if existing == nil {
    77  		return nil
    78  	}
    79  	return &existing.Version
    80  }
    81  
    82  // RepoSubscribe adds a subscription for automatic feature updates for
    83  // a GrowthBook instance. Feature values for the instance are updated
    84  // transparently when new values are retrieved from the API (either by
    85  // explicit requests or via SSE updates).
    86  
    87  func repoSubscribe(gb *GrowthBook) { refresh.addSubscription(gb) }
    88  
    89  // RepoUnsubscribe removes a subscription for automatic feature
    90  // updates for a GrowthBook instance.
    91  
    92  func repoUnsubscribe(gb *GrowthBook) { refresh.removeSubscription(gb) }
    93  
    94  // configureCacheStaleTTL sets the time-to-live duration for cache
    95  // entries.
    96  
    97  func configureCacheStaleTTL(ttl time.Duration) {
    98  	cacheStaleTTL = ttl
    99  }
   100  
   101  // Top-level feature fetching function. Responsible for caching,
   102  // starting background refresh goroutines, and timeout management for
   103  // API request, which is handed off to fetchFeatures.
   104  
   105  func fetchFeaturesWithCache(gb *GrowthBook, timeout time.Duration,
   106  	allowStale bool, skipCache bool) *FeatureAPIResponse {
   107  	key := getKey(gb)
   108  	now := time.Now()
   109  	cache.Initialize()
   110  	existing := cache.Get(key)
   111  
   112  	if existing != nil && !skipCache && (allowStale || existing.StaleAt.After(now)) {
   113  		if existing.StaleAt.Before(now) {
   114  			// Reload features in the backgroud if stale
   115  			go fetchFeatures(gb)
   116  		} else {
   117  			// Otherwise, if we don't need to refresh now, start a
   118  			// background sync.
   119  			refresh.runBackgroundRefresh(gb)
   120  		}
   121  		return existing.Data
   122  	} else {
   123  		// Perform API request with timeout.
   124  		if timeout == 0 {
   125  			return fetchFeatures(gb)
   126  		}
   127  		ch := make(chan *FeatureAPIResponse, 1)
   128  		timer := time.NewTimer(timeout)
   129  		go func() {
   130  			ch <- fetchFeatures(gb)
   131  		}()
   132  		select {
   133  		case result := <-ch:
   134  			return result
   135  		case <-timer.C:
   136  			return nil
   137  		}
   138  	}
   139  }
   140  
   141  // Mutex-protected map holding channels to concurrent requests for
   142  // features for the same repository key. Only one real HTTP request is
   143  // in flight at any time for a given repository key.
   144  
   145  var outstandingRequestMutex sync.Mutex
   146  var outstandingRequest map[RepositoryKey][]chan *FeatureAPIResponse
   147  
   148  // We need to be able to clear the outstanding requests when the cache
   149  // is cleared.
   150  
   151  func clearOutstandingRequests() {
   152  	outstandingRequestMutex.Lock()
   153  	defer outstandingRequestMutex.Unlock()
   154  
   155  	outstandingRequest = make(map[RepositoryKey][]chan *FeatureAPIResponse)
   156  }
   157  
   158  // Retrieve features from the API, ensuring that only one request for
   159  // any given repository key is in flight at any time.
   160  
   161  func fetchFeatures(gb *GrowthBook) *FeatureAPIResponse {
   162  	apiHost, clientKey := gb.GetAPIInfo()
   163  	key := makeKey(apiHost, clientKey)
   164  
   165  	// Get outstanding request channel, and flag to indicate whether
   166  	// this is the first channel created for this key.
   167  	myChan, first := addRequestChan(key)
   168  
   169  	// Either:
   170  	var apiResponse *FeatureAPIResponse
   171  	if first {
   172  		// We were the first request to come in, so perform the API
   173  		// request, and...
   174  		apiResponse = doFetchRequest(gb)
   175  
   176  		// ...retrieve a list of channels to other goroutines requesting
   177  		// features for the same repository key, clearing the outstanding
   178  		// requests slot for this repository key...
   179  		chans := removeRequestChan(key)
   180  
   181  		// ...then send the API response to all the waiting goroutines. We
   182  		// check that our channel is still in the list, in case the cache
   183  		// and the outstanding requests information has been cleared while
   184  		// we were making the request.
   185  		selfFound := false
   186  		for _, ch := range chans {
   187  			if ch != myChan {
   188  				ch <- apiResponse
   189  			} else {
   190  				// Don't send to ourselves, but record that our channel is
   191  				// still in the list.
   192  				selfFound = true
   193  			}
   194  		}
   195  
   196  		// Finally call the new feature data callback (from a single
   197  		// goroutine), assuming that the outstanding requests list hasn't
   198  		// been cleared in the meantime.
   199  		if apiResponse != nil && selfFound {
   200  			onNewFeatureData(key, apiResponse)
   201  			refresh.runBackgroundRefresh(gb)
   202  		}
   203  	} else {
   204  		// We were a later request, so just wait for the result from the
   205  		// goroutine performing the request on our channel.
   206  		apiResponse = <-myChan
   207  	}
   208  
   209  	// If something went wrong, we return an empty response, rather than
   210  	// nil.
   211  	if apiResponse == nil {
   212  		apiResponse = &FeatureAPIResponse{}
   213  	}
   214  	return apiResponse
   215  }
   216  
   217  // The first request for a given repository key will put a nil channel
   218  // value into the relevant slot of the outstandingRequest map.
   219  // Subsequent requests for the same repository key that come in while
   220  // the first request is being processed will create a channel to
   221  // receive the results from the in flight request.
   222  
   223  func addRequestChan(key RepositoryKey) (chan *FeatureAPIResponse, bool) {
   224  	outstandingRequestMutex.Lock()
   225  	defer outstandingRequestMutex.Unlock()
   226  
   227  	if outstandingRequest == nil {
   228  		outstandingRequest = make(map[RepositoryKey][]chan *FeatureAPIResponse)
   229  	}
   230  	chans := outstandingRequest[key]
   231  	myChan := make(chan *FeatureAPIResponse)
   232  	first := false
   233  	if chans == nil {
   234  		first = true
   235  		outstandingRequest[key] = []chan *FeatureAPIResponse{}
   236  	}
   237  	outstandingRequest[key] = append(outstandingRequest[key], myChan)
   238  
   239  	return myChan, first
   240  }
   241  
   242  // Remove the request channel for a given key.
   243  
   244  func removeRequestChan(key RepositoryKey) []chan *FeatureAPIResponse {
   245  	outstandingRequestMutex.Lock()
   246  	defer outstandingRequestMutex.Unlock()
   247  
   248  	chans := outstandingRequest[key]
   249  	delete(outstandingRequest, key)
   250  	return chans
   251  }
   252  
   253  // Actually do the HTTP request to get feature data.
   254  
   255  func doFetchRequest(gb *GrowthBook) *FeatureAPIResponse {
   256  	apiHost, clientKey := gb.GetAPIInfo()
   257  	key := makeKey(apiHost, clientKey)
   258  	endpoint := apiHost + "/api/features/" + clientKey
   259  
   260  	resp, err := http.Get(endpoint)
   261  	if err != nil {
   262  		logErrorf("Error fetching features: HTTP error: endpoint=%s error=%v",
   263  			endpoint, err)
   264  		return nil
   265  	}
   266  	if resp.Body != nil {
   267  		defer resp.Body.Close()
   268  	}
   269  	if resp.StatusCode != http.StatusOK {
   270  		body, err := ioutil.ReadAll(resp.Body)
   271  		if err != nil || len(body) == 0 {
   272  			body = []byte("<none>")
   273  		}
   274  		logErrorf("Error fetching features: HTTP error: endpoint=%s status=%d body=%s",
   275  			endpoint, resp.StatusCode, string(body))
   276  		return nil
   277  	}
   278  
   279  	body, err := ioutil.ReadAll(resp.Body)
   280  	if err != nil {
   281  		logErrorf("Error fetching features: reading response body: %v", err)
   282  		return nil
   283  	}
   284  
   285  	apiResponse := ParseFeatureAPIResponse(body)
   286  	if apiResponse == nil {
   287  		logErrorf("Error fetching features: parsing response: %v", err)
   288  		return nil
   289  	}
   290  
   291  	// Record whether this endpoint supports SSE updates.
   292  	sse, ok := resp.Header["X-Sse-Support"]
   293  	refresh.sseSupported(key, ok && sse[0] == "enabled")
   294  
   295  	return apiResponse
   296  }
   297  
   298  // Update values on the inner growthBookData data structures of
   299  // GrowthBook instances. See the comment on the New function in
   300  // growthbook.go for an explanation.
   301  
   302  func refreshInstance(inner *growthBookData, data *FeatureAPIResponse) {
   303  	if data.EncryptedFeatures != "" {
   304  		err := inner.withEncryptedFeatures(data.EncryptedFeatures, "")
   305  		if err != nil {
   306  			logError("failed to decrypt encrypted features")
   307  		}
   308  	} else {
   309  		features := data.Features
   310  		if features == nil {
   311  			features = inner.features()
   312  		}
   313  		inner.withFeatures(features)
   314  	}
   315  }
   316  
   317  // Callback to process feature updates from API, via both explicit
   318  // requests and background processing.
   319  
   320  func onNewFeatureData(key RepositoryKey, data *FeatureAPIResponse) {
   321  	// If contents haven't changed, ignore the update and extend the
   322  	// stale TTL.
   323  	version := data.DateUpdated
   324  	now := time.Now()
   325  	staleAt := now.Add(cacheStaleTTL)
   326  	existing := cache.Get(key)
   327  	if existing != nil && existing.Version == version {
   328  		existing.StaleAt = staleAt
   329  		return
   330  	}
   331  
   332  	// Update in-memory cache.
   333  	cache.Set(key, &CacheEntry{data, version, staleAt})
   334  
   335  	// Update features for all subscribed GrowthBook instances.
   336  	for _, inner := range refresh.instances(key) {
   337  		refreshInstance(inner, data)
   338  	}
   339  }
   340  
   341  // -----------------------------------------------------------------------------
   342  //
   343  //  AUTO-REFRESH PROCESSING
   344  
   345  // We store *only* the inner data structure of GrowthBook instances
   346  // here, so that the finalizer added to the main (outer) GrowthBook
   347  // instances will run, triggering an unsubscribe, allowing us to
   348  // remove the inner data structure here.
   349  type gbDataSet map[*growthBookData]bool
   350  
   351  type refreshData struct {
   352  	sync.RWMutex
   353  
   354  	// Repository keys where SSE is supported.
   355  	sse map[RepositoryKey]bool
   356  
   357  	// Channels to shut down SSE refresh goroutines.
   358  	shutdown map[RepositoryKey]chan struct{}
   359  
   360  	// Subscribed instances.
   361  	subscribed map[RepositoryKey]gbDataSet
   362  }
   363  
   364  func makeRefreshData() *refreshData {
   365  	return &refreshData{
   366  		sse:        make(map[RepositoryKey]bool),
   367  		shutdown:   make(map[RepositoryKey]chan struct{}),
   368  		subscribed: make(map[RepositoryKey]gbDataSet),
   369  	}
   370  }
   371  
   372  var refresh *refreshData = makeRefreshData()
   373  
   374  func clearAutoRefresh() {
   375  	refresh.stop()
   376  	refresh = makeRefreshData()
   377  }
   378  
   379  // Safely get list of GrowthBook instance inner data structures for a
   380  // repository key.
   381  
   382  func (r *refreshData) instances(key RepositoryKey) []*growthBookData {
   383  	r.RLock()
   384  	defer r.RUnlock()
   385  
   386  	m := r.subscribed[key]
   387  	if m == nil {
   388  		return []*growthBookData{}
   389  	}
   390  	result := make([]*growthBookData, len(m))
   391  	i := 0
   392  	for k := range m {
   393  		result[i] = k
   394  		i++
   395  	}
   396  	return result
   397  }
   398  
   399  // Shut down data refresh machinery.
   400  
   401  func (r *refreshData) stop() {
   402  	r.Lock()
   403  	defer r.Unlock()
   404  
   405  	for _, ch := range r.shutdown {
   406  		ch <- struct{}{}
   407  	}
   408  }
   409  
   410  // Add a subscription.
   411  
   412  func (r *refreshData) addSubscription(gb *GrowthBook) {
   413  	r.Lock()
   414  	defer r.Unlock()
   415  
   416  	key := getKey(gb)
   417  	subs := r.subscribed[key]
   418  	if subs == nil {
   419  		subs = make(gbDataSet)
   420  	}
   421  	subs[gb.inner] = true
   422  	r.subscribed[key] = subs
   423  }
   424  
   425  // Remove a subscription. Also closes down the auto-refresh goroutine
   426  // if there is one and this is the last subscriber.
   427  
   428  func (r *refreshData) removeSubscription(gb *GrowthBook) {
   429  	r.Lock()
   430  	defer r.Unlock()
   431  
   432  	key := getKey(gb)
   433  	subs := r.subscribed[key]
   434  	if subs != nil {
   435  		delete(subs, gb.inner)
   436  		if len(subs) == 0 {
   437  			subs = nil
   438  		}
   439  	}
   440  	r.subscribed[key] = subs
   441  
   442  	if subs == nil {
   443  		ch := r.shutdown[key]
   444  		if ch != nil {
   445  			ch <- struct{}{}
   446  			delete(r.shutdown, key)
   447  		}
   448  	}
   449  }
   450  
   451  func (r *refreshData) sseSupported(key RepositoryKey, supported bool) {
   452  	r.Lock()
   453  	defer r.Unlock()
   454  
   455  	r.sse[key] = supported
   456  }
   457  
   458  func (r *refreshData) runBackgroundRefresh(gb *GrowthBook) {
   459  	r.Lock()
   460  	defer r.Unlock()
   461  
   462  	key := getKey(gb)
   463  
   464  	// Conditions required to proceed here:
   465  	//  - Background sync must be enabled.
   466  	//  - The repository must support SSE.
   467  	//  - Background sync must not already be running for the repository.
   468  	if !cacheBackgroundSync || !r.sse[key] || r.shutdown[key] != nil {
   469  		return
   470  	}
   471  
   472  	ch := make(chan struct{})
   473  	refresh.shutdown[key] = ch
   474  	go refreshFromSSE(gb, ch)
   475  }
   476  
   477  func refreshFromSSE(gb *GrowthBook, shutdown chan struct{}) {
   478  	apiHost, clientKey := gb.GetAPIInfo()
   479  	key := makeKey(apiHost, clientKey)
   480  
   481  	var client *sse.Client
   482  	ch := make(chan *sse.Event)
   483  	reconnect := make(chan struct{}, 1)
   484  	reconnect <- struct{}{}
   485  	var errors int
   486  
   487  	for {
   488  		select {
   489  		case <-shutdown:
   490  			return
   491  
   492  		case <-reconnect:
   493  			logInfof("Connecting to SSE stream: %s", key)
   494  			errors = 0
   495  			client = sse.NewClient(apiHost + "/sub/" + clientKey)
   496  			client.OnDisconnect(func(c *sse.Client) {
   497  				logWarnf("SSE event stream disconnected: %s", key)
   498  				c.Unsubscribe(ch)
   499  				reconnect <- struct{}{}
   500  			})
   501  			err := client.SubscribeChan("features", ch)
   502  			if err != nil {
   503  				logErrorf("Connecting to SSE stream: %v", err)
   504  				return
   505  			}
   506  
   507  		case msg := <-ch:
   508  			if len(msg.Data) == 0 {
   509  				continue
   510  			}
   511  			var data FeatureAPIResponse
   512  			err := json.Unmarshal(msg.Data, &data)
   513  
   514  			if err != nil {
   515  				logErrorf("SSE error: %s (key: %s)", err.Error(), key)
   516  			}
   517  			if err != nil && client != nil {
   518  				errors++
   519  				if errors > 3 {
   520  					logErrorf("Multiple SSE errors: disconnecting stream: %s", key)
   521  					client.Unsubscribe(ch)
   522  					client = nil
   523  
   524  					// Exponential backoff after 4 errors, with jitter.
   525  					msDelay := math.Pow(3, float64(errors-3)) * (1000 + rand.Float64()*1000)
   526  					delay := time.Duration(msDelay) * time.Millisecond
   527  
   528  					// 5 minutes max.
   529  					if delay > 5*time.Minute {
   530  						delay = 5 * time.Minute
   531  					}
   532  					logWarnf("Waiting to reconnect SSE stream: %s (delaying %s)", key, delay)
   533  					time.Sleep(delay)
   534  					reconnect <- struct{}{}
   535  				}
   536  				continue
   537  			}
   538  			onNewFeatureData(key, &data)
   539  		}
   540  	}
   541  }
   542  
   543  // -----------------------------------------------------------------------------
   544  //
   545  //  CACHING
   546  //
   547  
   548  // Cache control parameters.
   549  
   550  var cacheBackgroundSync bool = true
   551  var cacheStaleTTL time.Duration = 60 * time.Second
   552  
   553  // Default in-memory cache.
   554  
   555  type repoCache struct {
   556  	sync.RWMutex
   557  	data map[RepositoryKey]*CacheEntry
   558  }
   559  
   560  var cache Cache = &repoCache{data: map[RepositoryKey]*CacheEntry{}}
   561  
   562  func (c *repoCache) Initialize() {}
   563  
   564  func (c *repoCache) Clear() {
   565  	c.Lock()
   566  	defer c.Unlock()
   567  
   568  	// Clear cache, auto-refresh info and outstanding requests.
   569  	c.data = make(map[RepositoryKey]*CacheEntry)
   570  	clearAutoRefresh()
   571  	clearOutstandingRequests()
   572  }
   573  
   574  func (c *repoCache) Get(key RepositoryKey) *CacheEntry {
   575  	c.RLock()
   576  	defer c.RUnlock()
   577  
   578  	return c.data[key]
   579  }
   580  
   581  func (c *repoCache) Set(key RepositoryKey, entry *CacheEntry) {
   582  	c.Lock()
   583  	defer c.Unlock()
   584  
   585  	c.data[key] = entry
   586  }
   587  
   588  // -----------------------------------------------------------------------------
   589  //
   590  //  REPOSITORY KEY UTILITIES
   591  
   592  func getKey(gb *GrowthBook) RepositoryKey {
   593  	apiHost, clientKey := gb.GetAPIInfo()
   594  	return RepositoryKey(apiHost + "||" + clientKey)
   595  }
   596  
   597  func makeKey(apiHost string, clientKey string) RepositoryKey {
   598  	return RepositoryKey(apiHost + "||" + clientKey)
   599  }
   600  

View as plain text