Simple and Powerful ORM for Go, support mysql,postgres,tidb,sqlite3,mssql,oracle https://xorm.io
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

247 lines
6.7KB

  1. // Copyright 2016 The Xorm Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package xorm
  5. import (
  6. "errors"
  7. "fmt"
  8. "strconv"
  9. "xorm.io/core"
  10. )
  11. func (session *Session) cacheDelete(table *core.Table, tableName, sqlStr string, args ...interface{}) error {
  12. if table == nil ||
  13. session.tx != nil {
  14. return ErrCacheFailed
  15. }
  16. for _, filter := range session.engine.dialect.Filters() {
  17. sqlStr = filter.Do(sqlStr, session.engine.dialect, table)
  18. }
  19. newsql := session.statement.convertIDSQL(sqlStr)
  20. if newsql == "" {
  21. return ErrCacheFailed
  22. }
  23. cacher := session.engine.getCacher(tableName)
  24. pkColumns := table.PKColumns()
  25. ids, err := core.GetCacheSql(cacher, tableName, newsql, args)
  26. if err != nil {
  27. resultsSlice, err := session.queryBytes(newsql, args...)
  28. if err != nil {
  29. return err
  30. }
  31. ids = make([]core.PK, 0)
  32. if len(resultsSlice) > 0 {
  33. for _, data := range resultsSlice {
  34. var id int64
  35. var pk core.PK = make([]interface{}, 0)
  36. for _, col := range pkColumns {
  37. if v, ok := data[col.Name]; !ok {
  38. return errors.New("no id")
  39. } else if col.SQLType.IsText() {
  40. pk = append(pk, string(v))
  41. } else if col.SQLType.IsNumeric() {
  42. id, err = strconv.ParseInt(string(v), 10, 64)
  43. if err != nil {
  44. return err
  45. }
  46. pk = append(pk, id)
  47. } else {
  48. return errors.New("not supported primary key type")
  49. }
  50. }
  51. ids = append(ids, pk)
  52. }
  53. }
  54. }
  55. for _, id := range ids {
  56. session.engine.logger.Debug("[cacheDelete] delete cache obj:", tableName, id)
  57. sid, err := id.ToString()
  58. if err != nil {
  59. return err
  60. }
  61. cacher.DelBean(tableName, sid)
  62. }
  63. session.engine.logger.Debug("[cacheDelete] clear cache table:", tableName)
  64. cacher.ClearIds(tableName)
  65. return nil
  66. }
  67. // Delete records, bean's non-empty fields are conditions
  68. func (session *Session) Delete(bean interface{}) (int64, error) {
  69. if session.isAutoClose {
  70. defer session.Close()
  71. }
  72. if session.statement.lastError != nil {
  73. return 0, session.statement.lastError
  74. }
  75. if err := session.statement.setRefBean(bean); err != nil {
  76. return 0, err
  77. }
  78. // handle before delete processors
  79. for _, closure := range session.beforeClosures {
  80. closure(bean)
  81. }
  82. cleanupProcessorsClosures(&session.beforeClosures)
  83. if processor, ok := interface{}(bean).(BeforeDeleteProcessor); ok {
  84. processor.BeforeDelete()
  85. }
  86. condSQL, condArgs, err := session.statement.genConds(bean)
  87. if err != nil {
  88. return 0, err
  89. }
  90. pLimitN := session.statement.LimitN
  91. if len(condSQL) == 0 && (pLimitN == nil || *pLimitN == 0) {
  92. return 0, ErrNeedDeletedCond
  93. }
  94. var tableNameNoQuote = session.statement.TableName()
  95. var tableName = session.engine.Quote(tableNameNoQuote)
  96. var table = session.statement.RefTable
  97. var deleteSQL string
  98. if len(condSQL) > 0 {
  99. deleteSQL = fmt.Sprintf("DELETE FROM %v WHERE %v", tableName, condSQL)
  100. } else {
  101. deleteSQL = fmt.Sprintf("DELETE FROM %v", tableName)
  102. }
  103. var orderSQL string
  104. if len(session.statement.OrderStr) > 0 {
  105. orderSQL += fmt.Sprintf(" ORDER BY %s", session.statement.OrderStr)
  106. }
  107. if pLimitN != nil && *pLimitN > 0 {
  108. limitNValue := *pLimitN
  109. orderSQL += fmt.Sprintf(" LIMIT %d", limitNValue)
  110. }
  111. if len(orderSQL) > 0 {
  112. switch session.engine.dialect.DBType() {
  113. case core.POSTGRES:
  114. inSQL := fmt.Sprintf("ctid IN (SELECT ctid FROM %s%s)", tableName, orderSQL)
  115. if len(condSQL) > 0 {
  116. deleteSQL += " AND " + inSQL
  117. } else {
  118. deleteSQL += " WHERE " + inSQL
  119. }
  120. case core.SQLITE:
  121. inSQL := fmt.Sprintf("rowid IN (SELECT rowid FROM %s%s)", tableName, orderSQL)
  122. if len(condSQL) > 0 {
  123. deleteSQL += " AND " + inSQL
  124. } else {
  125. deleteSQL += " WHERE " + inSQL
  126. }
  127. // TODO: how to handle delete limit on mssql?
  128. case core.MSSQL:
  129. return 0, ErrNotImplemented
  130. default:
  131. deleteSQL += orderSQL
  132. }
  133. }
  134. var realSQL string
  135. argsForCache := make([]interface{}, 0, len(condArgs)*2)
  136. if session.statement.unscoped || table.DeletedColumn() == nil { // tag "deleted" is disabled
  137. realSQL = deleteSQL
  138. copy(argsForCache, condArgs)
  139. argsForCache = append(condArgs, argsForCache...)
  140. } else {
  141. // !oinume! sqlStrForCache and argsForCache is needed to behave as executing "DELETE FROM ..." for cache.
  142. copy(argsForCache, condArgs)
  143. argsForCache = append(condArgs, argsForCache...)
  144. deletedColumn := table.DeletedColumn()
  145. realSQL = fmt.Sprintf("UPDATE %v SET %v = ? WHERE %v",
  146. session.engine.Quote(session.statement.TableName()),
  147. session.engine.Quote(deletedColumn.Name),
  148. condSQL)
  149. if len(orderSQL) > 0 {
  150. switch session.engine.dialect.DBType() {
  151. case core.POSTGRES:
  152. inSQL := fmt.Sprintf("ctid IN (SELECT ctid FROM %s%s)", tableName, orderSQL)
  153. if len(condSQL) > 0 {
  154. realSQL += " AND " + inSQL
  155. } else {
  156. realSQL += " WHERE " + inSQL
  157. }
  158. case core.SQLITE:
  159. inSQL := fmt.Sprintf("rowid IN (SELECT rowid FROM %s%s)", tableName, orderSQL)
  160. if len(condSQL) > 0 {
  161. realSQL += " AND " + inSQL
  162. } else {
  163. realSQL += " WHERE " + inSQL
  164. }
  165. // TODO: how to handle delete limit on mssql?
  166. case core.MSSQL:
  167. return 0, ErrNotImplemented
  168. default:
  169. realSQL += orderSQL
  170. }
  171. }
  172. // !oinume! Insert nowTime to the head of session.statement.Params
  173. condArgs = append(condArgs, "")
  174. paramsLen := len(condArgs)
  175. copy(condArgs[1:paramsLen], condArgs[0:paramsLen-1])
  176. val, t := session.engine.nowTime(deletedColumn)
  177. condArgs[0] = val
  178. var colName = deletedColumn.Name
  179. session.afterClosures = append(session.afterClosures, func(bean interface{}) {
  180. col := table.GetColumn(colName)
  181. setColumnTime(bean, col, t)
  182. })
  183. }
  184. if cacher := session.engine.getCacher(tableNameNoQuote); cacher != nil && session.statement.UseCache {
  185. session.cacheDelete(table, tableNameNoQuote, deleteSQL, argsForCache...)
  186. }
  187. session.statement.RefTable = table
  188. res, err := session.exec(realSQL, condArgs...)
  189. if err != nil {
  190. return 0, err
  191. }
  192. // handle after delete processors
  193. if session.isAutoCommit {
  194. for _, closure := range session.afterClosures {
  195. closure(bean)
  196. }
  197. if processor, ok := interface{}(bean).(AfterDeleteProcessor); ok {
  198. processor.AfterDelete()
  199. }
  200. } else {
  201. lenAfterClosures := len(session.afterClosures)
  202. if lenAfterClosures > 0 {
  203. if value, has := session.afterDeleteBeans[bean]; has && value != nil {
  204. *value = append(*value, session.afterClosures...)
  205. } else {
  206. afterClosures := make([]func(interface{}), lenAfterClosures)
  207. copy(afterClosures, session.afterClosures)
  208. session.afterDeleteBeans[bean] = &afterClosures
  209. }
  210. } else {
  211. if _, ok := interface{}(bean).(AfterDeleteProcessor); ok {
  212. session.afterDeleteBeans[bean] = nil
  213. }
  214. }
  215. }
  216. cleanupProcessorsClosures(&session.afterClosures)
  217. // --
  218. return res.RowsAffected()
  219. }