You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
This repo is archived. You can view files and clone it, but cannot push or open issues/pull-requests.
 

180 lines
4.3 KiB

  1. // Copyright 2014 The Macaron Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License"): you may
  4. // not use this file except in compliance with the License. You may obtain
  5. // a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. // License for the specific language governing permissions and limitations
  13. // under the License.
  14. package cache
  15. import (
  16. "crypto/md5"
  17. "database/sql"
  18. "encoding/hex"
  19. "log"
  20. "time"
  21. _ "github.com/lib/pq"
  22. "github.com/tango-contrib/cache"
  23. )
  24. // PostgresCacher represents a postgres cache adapter implementation.
  25. type PostgresCacher struct {
  26. c *sql.DB
  27. interval int
  28. }
  29. // NewPostgresCacher creates and returns a new postgres cacher.
  30. func NewPostgresCacher() *PostgresCacher {
  31. return &PostgresCacher{}
  32. }
  33. func (c *PostgresCacher) md5(key string) string {
  34. m := md5.Sum([]byte(key))
  35. return hex.EncodeToString(m[:])
  36. }
  37. // Put puts value into cache with key and expire time.
  38. // If expired is 0, it will be deleted by next GC operation.
  39. func (c *PostgresCacher) Put(key string, val interface{}, expire int64) error {
  40. item := &cache.Item{Val: val}
  41. data, err := cache.EncodeGob(item)
  42. if err != nil {
  43. return err
  44. }
  45. now := time.Now().Unix()
  46. if c.IsExist(key) {
  47. _, err = c.c.Exec("UPDATE cache SET data=$1, created=$2, expire=$3 WHERE key=$4", data, now, expire, c.md5(key))
  48. } else {
  49. _, err = c.c.Exec("INSERT INTO cache(key,data,created,expire) VALUES($1,$2,$3,$4)", c.md5(key), data, now, expire)
  50. }
  51. return err
  52. }
  53. func (c *PostgresCacher) read(key string) (*cache.Item, error) {
  54. var (
  55. data []byte
  56. created int64
  57. expire int64
  58. )
  59. err := c.c.QueryRow("SELECT data,created,expire FROM cache WHERE key=$1", c.md5(key)).Scan(&data, &created, &expire)
  60. if err != nil {
  61. return nil, err
  62. }
  63. item := new(cache.Item)
  64. if err = cache.DecodeGob(data, item); err != nil {
  65. return nil, err
  66. }
  67. item.Created = created
  68. item.Expire = expire
  69. return item, nil
  70. }
  71. // Get gets cached value by given key.
  72. func (c *PostgresCacher) Get(key string) interface{} {
  73. item, err := c.read(key)
  74. if err != nil {
  75. return nil
  76. }
  77. if item.Expire > 0 &&
  78. (time.Now().Unix()-item.Created) >= item.Expire {
  79. c.Delete(key)
  80. return nil
  81. }
  82. return item.Val
  83. }
  84. // Delete deletes cached value by given key.
  85. func (c *PostgresCacher) Delete(key string) error {
  86. _, err := c.c.Exec("DELETE FROM cache WHERE key=$1", c.md5(key))
  87. return err
  88. }
  89. // Incr increases cached int-type value by given key as a counter.
  90. func (c *PostgresCacher) Incr(key string) error {
  91. item, err := c.read(key)
  92. if err != nil {
  93. return err
  94. }
  95. item.Val, err = cache.Incr(item.Val)
  96. if err != nil {
  97. return err
  98. }
  99. return c.Put(key, item.Val, item.Expire)
  100. }
  101. // Decrease cached int value.
  102. func (c *PostgresCacher) Decr(key string) error {
  103. item, err := c.read(key)
  104. if err != nil {
  105. return err
  106. }
  107. item.Val, err = cache.Decr(item.Val)
  108. if err != nil {
  109. return err
  110. }
  111. return c.Put(key, item.Val, item.Expire)
  112. }
  113. // IsExist returns true if cached value exists.
  114. func (c *PostgresCacher) IsExist(key string) bool {
  115. var data []byte
  116. err := c.c.QueryRow("SELECT data FROM cache WHERE key=$1", c.md5(key)).Scan(&data)
  117. if err != nil && err != sql.ErrNoRows {
  118. panic("cache/postgres: error checking existence: " + err.Error())
  119. }
  120. return err != sql.ErrNoRows
  121. }
  122. // Flush deletes all cached data.
  123. func (c *PostgresCacher) Flush() error {
  124. _, err := c.c.Exec("DELETE FROM cache")
  125. return err
  126. }
  127. func (c *PostgresCacher) startGC() {
  128. if c.interval < 1 {
  129. return
  130. }
  131. if _, err := c.c.Exec("DELETE FROM cache WHERE EXTRACT(EPOCH FROM NOW()) - created >= expire"); err != nil {
  132. log.Printf("cache/postgres: error garbage collecting: %v", err)
  133. }
  134. time.AfterFunc(time.Duration(c.interval)*time.Second, func() { c.startGC() })
  135. }
  136. // StartAndGC starts GC routine based on config string settings.
  137. func (c *PostgresCacher) StartAndGC(opt cache.Options) (err error) {
  138. c.interval = opt.Interval
  139. c.c, err = sql.Open("postgres", opt.AdapterConfig)
  140. if err != nil {
  141. return err
  142. } else if err = c.c.Ping(); err != nil {
  143. return err
  144. }
  145. go c.startGC()
  146. return nil
  147. }
  148. func init() {
  149. cache.Register("postgres", NewPostgresCacher())
  150. }