1 package growthbook
2
3 import (
4 "encoding/json"
5 "io/ioutil"
6 "math"
7 "math/rand"
8 "net/http"
9 "sync"
10 "time"
11
12 "github.com/ian-ross/sse/v2"
13 )
14
15
16
17
18
19 type RepositoryKey string
20
21
22
23 type Cache interface {
24 Initialize()
25 Clear()
26 Get(key RepositoryKey) *CacheEntry
27 Set(key RepositoryKey, entry *CacheEntry)
28 }
29
30
31
32 type CacheEntry struct {
33 Data *FeatureAPIResponse `json:"data"`
34 Version time.Time `json:"version"`
35 StaleAt time.Time `json:"stale_at"`
36 }
37
38
39
40 func ConfigureCache(c Cache) {
41 if c == nil {
42 c = &repoCache{}
43 }
44 cache.Clear()
45 cache = c
46 }
47
48
49
50
51 func ConfigureCacheBackgroundSync(bgSync bool) {
52 cacheBackgroundSync = bgSync
53 if !bgSync {
54 clearAutoRefresh()
55 }
56 }
57
58
59
60
61
62
63
64
65 func repoRefreshFeatures(gb *GrowthBook, timeout time.Duration,
66 skipCache bool, allowStale bool, updateInstance bool) {
67 data := fetchFeaturesWithCache(gb, timeout, allowStale, skipCache)
68 if updateInstance && data != nil {
69 refreshInstance(gb.inner, data)
70 }
71 }
72
73 func repoLatestUpdate(gb *GrowthBook) *time.Time {
74 key := getKey(gb)
75 existing := cache.Get(key)
76 if existing == nil {
77 return nil
78 }
79 return &existing.Version
80 }
81
82
83
84
85
86
87 func repoSubscribe(gb *GrowthBook) { refresh.addSubscription(gb) }
88
89
90
91
92 func repoUnsubscribe(gb *GrowthBook) { refresh.removeSubscription(gb) }
93
94
95
96
97 func configureCacheStaleTTL(ttl time.Duration) {
98 cacheStaleTTL = ttl
99 }
100
101
102
103
104
105 func fetchFeaturesWithCache(gb *GrowthBook, timeout time.Duration,
106 allowStale bool, skipCache bool) *FeatureAPIResponse {
107 key := getKey(gb)
108 now := time.Now()
109 cache.Initialize()
110 existing := cache.Get(key)
111
112 if existing != nil && !skipCache && (allowStale || existing.StaleAt.After(now)) {
113 if existing.StaleAt.Before(now) {
114
115 go fetchFeatures(gb)
116 } else {
117
118
119 refresh.runBackgroundRefresh(gb)
120 }
121 return existing.Data
122 } else {
123
124 if timeout == 0 {
125 return fetchFeatures(gb)
126 }
127 ch := make(chan *FeatureAPIResponse, 1)
128 timer := time.NewTimer(timeout)
129 go func() {
130 ch <- fetchFeatures(gb)
131 }()
132 select {
133 case result := <-ch:
134 return result
135 case <-timer.C:
136 return nil
137 }
138 }
139 }
140
141
142
143
144
145 var outstandingRequestMutex sync.Mutex
146 var outstandingRequest map[RepositoryKey][]chan *FeatureAPIResponse
147
148
149
150
151 func clearOutstandingRequests() {
152 outstandingRequestMutex.Lock()
153 defer outstandingRequestMutex.Unlock()
154
155 outstandingRequest = make(map[RepositoryKey][]chan *FeatureAPIResponse)
156 }
157
158
159
160
161 func fetchFeatures(gb *GrowthBook) *FeatureAPIResponse {
162 apiHost, clientKey := gb.GetAPIInfo()
163 key := makeKey(apiHost, clientKey)
164
165
166
167 myChan, first := addRequestChan(key)
168
169
170 var apiResponse *FeatureAPIResponse
171 if first {
172
173
174 apiResponse = doFetchRequest(gb)
175
176
177
178
179 chans := removeRequestChan(key)
180
181
182
183
184
185 selfFound := false
186 for _, ch := range chans {
187 if ch != myChan {
188 ch <- apiResponse
189 } else {
190
191
192 selfFound = true
193 }
194 }
195
196
197
198
199 if apiResponse != nil && selfFound {
200 onNewFeatureData(key, apiResponse)
201 refresh.runBackgroundRefresh(gb)
202 }
203 } else {
204
205
206 apiResponse = <-myChan
207 }
208
209
210
211 if apiResponse == nil {
212 apiResponse = &FeatureAPIResponse{}
213 }
214 return apiResponse
215 }
216
217
218
219
220
221
222
223 func addRequestChan(key RepositoryKey) (chan *FeatureAPIResponse, bool) {
224 outstandingRequestMutex.Lock()
225 defer outstandingRequestMutex.Unlock()
226
227 if outstandingRequest == nil {
228 outstandingRequest = make(map[RepositoryKey][]chan *FeatureAPIResponse)
229 }
230 chans := outstandingRequest[key]
231 myChan := make(chan *FeatureAPIResponse)
232 first := false
233 if chans == nil {
234 first = true
235 outstandingRequest[key] = []chan *FeatureAPIResponse{}
236 }
237 outstandingRequest[key] = append(outstandingRequest[key], myChan)
238
239 return myChan, first
240 }
241
242
243
244 func removeRequestChan(key RepositoryKey) []chan *FeatureAPIResponse {
245 outstandingRequestMutex.Lock()
246 defer outstandingRequestMutex.Unlock()
247
248 chans := outstandingRequest[key]
249 delete(outstandingRequest, key)
250 return chans
251 }
252
253
254
255 func doFetchRequest(gb *GrowthBook) *FeatureAPIResponse {
256 apiHost, clientKey := gb.GetAPIInfo()
257 key := makeKey(apiHost, clientKey)
258 endpoint := apiHost + "/api/features/" + clientKey
259
260 resp, err := http.Get(endpoint)
261 if err != nil {
262 logErrorf("Error fetching features: HTTP error: endpoint=%s error=%v",
263 endpoint, err)
264 return nil
265 }
266 if resp.Body != nil {
267 defer resp.Body.Close()
268 }
269 if resp.StatusCode != http.StatusOK {
270 body, err := ioutil.ReadAll(resp.Body)
271 if err != nil || len(body) == 0 {
272 body = []byte("<none>")
273 }
274 logErrorf("Error fetching features: HTTP error: endpoint=%s status=%d body=%s",
275 endpoint, resp.StatusCode, string(body))
276 return nil
277 }
278
279 body, err := ioutil.ReadAll(resp.Body)
280 if err != nil {
281 logErrorf("Error fetching features: reading response body: %v", err)
282 return nil
283 }
284
285 apiResponse := ParseFeatureAPIResponse(body)
286 if apiResponse == nil {
287 logErrorf("Error fetching features: parsing response: %v", err)
288 return nil
289 }
290
291
292 sse, ok := resp.Header["X-Sse-Support"]
293 refresh.sseSupported(key, ok && sse[0] == "enabled")
294
295 return apiResponse
296 }
297
298
299
300
301
302 func refreshInstance(inner *growthBookData, data *FeatureAPIResponse) {
303 if data.EncryptedFeatures != "" {
304 err := inner.withEncryptedFeatures(data.EncryptedFeatures, "")
305 if err != nil {
306 logError("failed to decrypt encrypted features")
307 }
308 } else {
309 features := data.Features
310 if features == nil {
311 features = inner.features()
312 }
313 inner.withFeatures(features)
314 }
315 }
316
317
318
319
320 func onNewFeatureData(key RepositoryKey, data *FeatureAPIResponse) {
321
322
323 version := data.DateUpdated
324 now := time.Now()
325 staleAt := now.Add(cacheStaleTTL)
326 existing := cache.Get(key)
327 if existing != nil && existing.Version == version {
328 existing.StaleAt = staleAt
329 return
330 }
331
332
333 cache.Set(key, &CacheEntry{data, version, staleAt})
334
335
336 for _, inner := range refresh.instances(key) {
337 refreshInstance(inner, data)
338 }
339 }
340
341
342
343
344
345
346
347
348
349 type gbDataSet map[*growthBookData]bool
350
351 type refreshData struct {
352 sync.RWMutex
353
354
355 sse map[RepositoryKey]bool
356
357
358 shutdown map[RepositoryKey]chan struct{}
359
360
361 subscribed map[RepositoryKey]gbDataSet
362 }
363
364 func makeRefreshData() *refreshData {
365 return &refreshData{
366 sse: make(map[RepositoryKey]bool),
367 shutdown: make(map[RepositoryKey]chan struct{}),
368 subscribed: make(map[RepositoryKey]gbDataSet),
369 }
370 }
371
372 var refresh *refreshData = makeRefreshData()
373
374 func clearAutoRefresh() {
375 refresh.stop()
376 refresh = makeRefreshData()
377 }
378
379
380
381
382 func (r *refreshData) instances(key RepositoryKey) []*growthBookData {
383 r.RLock()
384 defer r.RUnlock()
385
386 m := r.subscribed[key]
387 if m == nil {
388 return []*growthBookData{}
389 }
390 result := make([]*growthBookData, len(m))
391 i := 0
392 for k := range m {
393 result[i] = k
394 i++
395 }
396 return result
397 }
398
399
400
401 func (r *refreshData) stop() {
402 r.Lock()
403 defer r.Unlock()
404
405 for _, ch := range r.shutdown {
406 ch <- struct{}{}
407 }
408 }
409
410
411
412 func (r *refreshData) addSubscription(gb *GrowthBook) {
413 r.Lock()
414 defer r.Unlock()
415
416 key := getKey(gb)
417 subs := r.subscribed[key]
418 if subs == nil {
419 subs = make(gbDataSet)
420 }
421 subs[gb.inner] = true
422 r.subscribed[key] = subs
423 }
424
425
426
427
428 func (r *refreshData) removeSubscription(gb *GrowthBook) {
429 r.Lock()
430 defer r.Unlock()
431
432 key := getKey(gb)
433 subs := r.subscribed[key]
434 if subs != nil {
435 delete(subs, gb.inner)
436 if len(subs) == 0 {
437 subs = nil
438 }
439 }
440 r.subscribed[key] = subs
441
442 if subs == nil {
443 ch := r.shutdown[key]
444 if ch != nil {
445 ch <- struct{}{}
446 delete(r.shutdown, key)
447 }
448 }
449 }
450
451 func (r *refreshData) sseSupported(key RepositoryKey, supported bool) {
452 r.Lock()
453 defer r.Unlock()
454
455 r.sse[key] = supported
456 }
457
458 func (r *refreshData) runBackgroundRefresh(gb *GrowthBook) {
459 r.Lock()
460 defer r.Unlock()
461
462 key := getKey(gb)
463
464
465
466
467
468 if !cacheBackgroundSync || !r.sse[key] || r.shutdown[key] != nil {
469 return
470 }
471
472 ch := make(chan struct{})
473 refresh.shutdown[key] = ch
474 go refreshFromSSE(gb, ch)
475 }
476
477 func refreshFromSSE(gb *GrowthBook, shutdown chan struct{}) {
478 apiHost, clientKey := gb.GetAPIInfo()
479 key := makeKey(apiHost, clientKey)
480
481 var client *sse.Client
482 ch := make(chan *sse.Event)
483 reconnect := make(chan struct{}, 1)
484 reconnect <- struct{}{}
485 var errors int
486
487 for {
488 select {
489 case <-shutdown:
490 return
491
492 case <-reconnect:
493 logInfof("Connecting to SSE stream: %s", key)
494 errors = 0
495 client = sse.NewClient(apiHost + "/sub/" + clientKey)
496 client.OnDisconnect(func(c *sse.Client) {
497 logWarnf("SSE event stream disconnected: %s", key)
498 c.Unsubscribe(ch)
499 reconnect <- struct{}{}
500 })
501 err := client.SubscribeChan("features", ch)
502 if err != nil {
503 logErrorf("Connecting to SSE stream: %v", err)
504 return
505 }
506
507 case msg := <-ch:
508 if len(msg.Data) == 0 {
509 continue
510 }
511 var data FeatureAPIResponse
512 err := json.Unmarshal(msg.Data, &data)
513
514 if err != nil {
515 logErrorf("SSE error: %s (key: %s)", err.Error(), key)
516 }
517 if err != nil && client != nil {
518 errors++
519 if errors > 3 {
520 logErrorf("Multiple SSE errors: disconnecting stream: %s", key)
521 client.Unsubscribe(ch)
522 client = nil
523
524
525 msDelay := math.Pow(3, float64(errors-3)) * (1000 + rand.Float64()*1000)
526 delay := time.Duration(msDelay) * time.Millisecond
527
528
529 if delay > 5*time.Minute {
530 delay = 5 * time.Minute
531 }
532 logWarnf("Waiting to reconnect SSE stream: %s (delaying %s)", key, delay)
533 time.Sleep(delay)
534 reconnect <- struct{}{}
535 }
536 continue
537 }
538 onNewFeatureData(key, &data)
539 }
540 }
541 }
542
543
544
545
546
547
548
549
550 var cacheBackgroundSync bool = true
551 var cacheStaleTTL time.Duration = 60 * time.Second
552
553
554
555 type repoCache struct {
556 sync.RWMutex
557 data map[RepositoryKey]*CacheEntry
558 }
559
560 var cache Cache = &repoCache{data: map[RepositoryKey]*CacheEntry{}}
561
562 func (c *repoCache) Initialize() {}
563
564 func (c *repoCache) Clear() {
565 c.Lock()
566 defer c.Unlock()
567
568
569 c.data = make(map[RepositoryKey]*CacheEntry)
570 clearAutoRefresh()
571 clearOutstandingRequests()
572 }
573
574 func (c *repoCache) Get(key RepositoryKey) *CacheEntry {
575 c.RLock()
576 defer c.RUnlock()
577
578 return c.data[key]
579 }
580
581 func (c *repoCache) Set(key RepositoryKey, entry *CacheEntry) {
582 c.Lock()
583 defer c.Unlock()
584
585 c.data[key] = entry
586 }
587
588
589
590
591
592 func getKey(gb *GrowthBook) RepositoryKey {
593 apiHost, clientKey := gb.GetAPIInfo()
594 return RepositoryKey(apiHost + "||" + clientKey)
595 }
596
597 func makeKey(apiHost string, clientKey string) RepositoryKey {
598 return RepositoryKey(apiHost + "||" + clientKey)
599 }
600
View as plain text