XORM Redis Cache
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

316 lines
8.1 KiB

  1. package xormrediscache
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. "fmt"
  6. "hash/crc32"
  7. "reflect"
  8. "time"
  9. "unsafe"
  10. "github.com/garyburd/redigo/redis"
  11. "xorm.io/xorm/caches"
  12. "xorm.io/xorm/log"
  13. )
  14. const (
  15. DEFAULT_EXPIRATION = time.Duration(0)
  16. FOREVER_EXPIRATION = time.Duration(-1)
  17. LOGGING_PREFIX = "[redis_cacher]"
  18. )
  19. // RedisCacher wraps the Redis client to meet the Cache interface.
  20. type RedisCacher struct {
  21. pool *redis.Pool
  22. defaultExpiration time.Duration
  23. Logger log.ContextLogger
  24. }
  25. // NewRedisCacher creates a Redis Cacher, host as IP endpoint, i.e., localhost:6379, provide empty string or nil if Redis server doesn't
  26. // require AUTH command, defaultExpiration sets the expire duration for a key to live. Until redigo supports
  27. // sharding/clustering, only one host will be in hostList
  28. //
  29. // engine.SetDefaultCacher(xormrediscache.NewRedisCacher("localhost:6379", "", xormrediscache.DEFAULT_EXPIRATION, engine.Logger))
  30. //
  31. // or set MapCacher
  32. //
  33. // engine.MapCacher(&user, xormrediscache.NewRedisCacher("localhost:6379", "", xormrediscache.DEFAULT_EXPIRATION, engine.Logger))
  34. //
  35. func NewRedisCacher(host string, password string, defaultExpiration time.Duration, logger log.ContextLogger) *RedisCacher {
  36. var pool = &redis.Pool{
  37. MaxIdle: 5,
  38. IdleTimeout: 240 * time.Second,
  39. Dial: func() (redis.Conn, error) {
  40. // the redis protocol should probably be made sett-able
  41. c, err := redis.Dial("tcp", host)
  42. if err != nil {
  43. return nil, err
  44. }
  45. if len(password) > 0 {
  46. if _, err := c.Do("AUTH", password); err != nil {
  47. c.Close()
  48. return nil, err
  49. }
  50. } else {
  51. // check with PING
  52. if _, err := c.Do("PING"); err != nil {
  53. c.Close()
  54. return nil, err
  55. }
  56. }
  57. return c, err
  58. },
  59. // custom connection test method
  60. TestOnBorrow: func(c redis.Conn, t time.Time) error {
  61. if _, err := c.Do("PING"); err != nil {
  62. return err
  63. }
  64. return nil
  65. },
  66. }
  67. return MakeRedisCacher(pool, defaultExpiration, logger)
  68. }
  69. // MakeRedisCacher build a cacher based on redis.Pool
  70. func MakeRedisCacher(pool *redis.Pool, defaultExpiration time.Duration, logger log.ContextLogger) *RedisCacher {
  71. return &RedisCacher{pool: pool, defaultExpiration: defaultExpiration, Logger: logger}
  72. }
  73. func exists(conn redis.Conn, key string) bool {
  74. existed, _ := redis.Bool(conn.Do("EXISTS", key))
  75. return existed
  76. }
  77. func (c *RedisCacher) logErrf(format string, contents ...interface{}) {
  78. if c.Logger != nil {
  79. c.Logger.Errorf(fmt.Sprintf("%s %s", LOGGING_PREFIX, format), contents...)
  80. }
  81. }
  82. func (c *RedisCacher) logDebugf(format string, contents ...interface{}) {
  83. if c.Logger != nil {
  84. c.Logger.Debugf(fmt.Sprintf("%s %s", LOGGING_PREFIX, format), contents...)
  85. }
  86. }
  87. func (c *RedisCacher) getBeanKey(tableName string, id string) string {
  88. return fmt.Sprintf("xorm:bean:%s:%s", tableName, id)
  89. }
  90. func (c *RedisCacher) getSqlKey(tableName string, sql string) string {
  91. // hash sql to minimize key length
  92. crc := crc32.ChecksumIEEE([]byte(sql))
  93. return fmt.Sprintf("xorm:sql:%s:%d", tableName, crc)
  94. }
  95. // Flush deletes all xorm cached objects
  96. func (c *RedisCacher) Flush() error {
  97. // conn := c.pool.Get()
  98. // defer conn.Close()
  99. // _, err := conn.Do("FLUSHALL")
  100. // return err
  101. return c.delObject("xorm:*")
  102. }
  103. func (c *RedisCacher) getObject(key string) interface{} {
  104. conn := c.pool.Get()
  105. defer conn.Close()
  106. raw, err := conn.Do("GET", key)
  107. if raw == nil {
  108. return nil
  109. }
  110. item, err := redis.Bytes(raw, err)
  111. if err != nil {
  112. c.logErrf("redis.Bytes failed: %s", err)
  113. return nil
  114. }
  115. value, err := c.deserialize(item)
  116. return value
  117. }
  118. func (c *RedisCacher) GetIds(tableName, sql string) interface{} {
  119. sqlKey := c.getSqlKey(tableName, sql)
  120. c.logDebugf(" GetIds|tableName:%s|sql:%s|key:%s", tableName, sql, sqlKey)
  121. return c.getObject(sqlKey)
  122. }
  123. func (c *RedisCacher) GetBean(tableName string, id string) interface{} {
  124. beanKey := c.getBeanKey(tableName, id)
  125. c.logDebugf("[xorm/redis_cacher] GetBean|tableName:%s|id:%s|key:%s", tableName, id, beanKey)
  126. return c.getObject(beanKey)
  127. }
  128. func (c *RedisCacher) putObject(key string, value interface{}) {
  129. c.invoke(c.pool.Get().Do, key, value, c.defaultExpiration)
  130. }
  131. func (c *RedisCacher) PutIds(tableName, sql string, ids interface{}) {
  132. sqlKey := c.getSqlKey(tableName, sql)
  133. c.logDebugf("PutIds|tableName:%s|sql:%s|key:%s|obj:%s|type:%v", tableName, sql, sqlKey, ids, reflect.TypeOf(ids))
  134. c.putObject(sqlKey, ids)
  135. }
  136. func (c *RedisCacher) PutBean(tableName string, id string, obj interface{}) {
  137. beanKey := c.getBeanKey(tableName, id)
  138. c.logDebugf("PutBean|tableName:%s|id:%s|key:%s|type:%v", tableName, id, beanKey, reflect.TypeOf(obj))
  139. c.putObject(beanKey, obj)
  140. }
  141. func (c *RedisCacher) delObject(key string) error {
  142. c.logDebugf("delObject key:[%s]", key)
  143. conn := c.pool.Get()
  144. defer conn.Close()
  145. if !exists(conn, key) {
  146. c.logErrf("delObject key:[%s] err: %v", key, caches.ErrCacheMiss)
  147. return caches.ErrCacheMiss
  148. }
  149. _, err := conn.Do("DEL", key)
  150. return err
  151. }
  152. func (c *RedisCacher) delObjects(key string) error {
  153. c.logDebugf("delObjects key:[%s]", key)
  154. conn := c.pool.Get()
  155. defer conn.Close()
  156. keys, err := conn.Do("KEYS", key)
  157. c.logDebugf("delObjects keys: %v", keys)
  158. if err == nil {
  159. for _, key := range keys.([]interface{}) {
  160. conn.Do("DEL", key)
  161. }
  162. }
  163. return err
  164. }
  165. func (c *RedisCacher) DelIds(tableName, sql string) {
  166. c.delObject(c.getSqlKey(tableName, sql))
  167. }
  168. func (c *RedisCacher) DelBean(tableName string, id string) {
  169. c.delObject(c.getBeanKey(tableName, id))
  170. }
  171. func (c *RedisCacher) ClearIds(tableName string) {
  172. c.delObjects(fmt.Sprintf("xorm:sql:%s:*", tableName))
  173. }
  174. func (c *RedisCacher) ClearBeans(tableName string) {
  175. c.delObjects(c.getBeanKey(tableName, "*"))
  176. }
  177. func (c *RedisCacher) invoke(f func(string, ...interface{}) (interface{}, error),
  178. key string, value interface{}, expires time.Duration) error {
  179. switch expires {
  180. case DEFAULT_EXPIRATION:
  181. expires = c.defaultExpiration
  182. case FOREVER_EXPIRATION:
  183. expires = time.Duration(0)
  184. }
  185. b, err := c.serialize(value)
  186. if err != nil {
  187. return err
  188. }
  189. conn := c.pool.Get()
  190. defer conn.Close()
  191. if expires > 0 {
  192. _, err := f("SETEX", key, int32(expires/time.Second), b)
  193. return err
  194. } else {
  195. _, err := f("SET", key, b)
  196. return err
  197. }
  198. }
  199. func (c *RedisCacher) serialize(value interface{}) ([]byte, error) {
  200. err := c.registerGobConcreteType(value)
  201. if err != nil {
  202. return nil, err
  203. }
  204. if reflect.TypeOf(value).Kind() == reflect.Struct {
  205. return nil, fmt.Errorf("serialize func only take pointer of a struct")
  206. }
  207. var b bytes.Buffer
  208. encoder := gob.NewEncoder(&b)
  209. c.logDebugf("serialize type:%v", reflect.TypeOf(value))
  210. err = encoder.Encode(&value)
  211. if err != nil {
  212. c.logErrf("gob encoding '%s' failed: %s|value:%v", value, err, value)
  213. return nil, err
  214. }
  215. return b.Bytes(), nil
  216. }
  217. func (c *RedisCacher) deserialize(byt []byte) (ptr interface{}, err error) {
  218. b := bytes.NewBuffer(byt)
  219. decoder := gob.NewDecoder(b)
  220. var p interface{}
  221. err = decoder.Decode(&p)
  222. if err != nil {
  223. c.logErrf("decode failed: %v", err)
  224. return
  225. }
  226. v := reflect.ValueOf(p)
  227. c.logDebugf("deserialize type:%v", v.Type())
  228. if v.Kind() == reflect.Struct {
  229. var pp interface{} = &p
  230. datas := reflect.ValueOf(pp).Elem().InterfaceData()
  231. sp := reflect.NewAt(v.Type(),
  232. unsafe.Pointer(datas[1])).Interface()
  233. ptr = sp
  234. vv := reflect.ValueOf(ptr)
  235. c.logDebugf("deserialize convert ptr type:%v | CanAddr:%t", vv.Type(), vv.CanAddr())
  236. } else {
  237. ptr = p
  238. }
  239. return
  240. }
  241. func (c *RedisCacher) registerGobConcreteType(value interface{}) error {
  242. t := reflect.TypeOf(value)
  243. c.logDebugf("registerGobConcreteType:%v", t)
  244. switch t.Kind() {
  245. case reflect.Ptr:
  246. v := reflect.ValueOf(value)
  247. i := v.Elem().Interface()
  248. gob.Register(&i)
  249. case reflect.Struct, reflect.Map, reflect.Slice:
  250. gob.Register(value)
  251. case reflect.String, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Bool, reflect.Float32, reflect.Float64, reflect.Complex64, reflect.Complex128:
  252. // do nothing since already registered known type
  253. default:
  254. return fmt.Errorf("unhandled type: %v", t)
  255. }
  256. return nil
  257. }
  258. func (c *RedisCacher) GetPool() (*redis.Pool, error) {
  259. return c.pool, nil
  260. }
  261. func (c *RedisCacher) SetPool(pool *redis.Pool) {
  262. c.pool = pool
  263. }