| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
 | /*
   GoToSocial
   Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
   This program is free software: you can redistribute it and/or modify
   it under the terms of the GNU Affero General Public License as published by
   the Free Software Foundation, either version 3 of the License, or
   (at your option) any later version.
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU Affero General Public License for more details.
   You should have received a copy of the GNU Affero General Public License
   along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
package media
import (
	"context"
	"fmt"
	"time"
	"github.com/robfig/cron/v3"
	"github.com/superseriousbusiness/gotosocial/internal/concurrency"
	"github.com/superseriousbusiness/gotosocial/internal/config"
	"github.com/superseriousbusiness/gotosocial/internal/db"
	"github.com/superseriousbusiness/gotosocial/internal/log"
	"github.com/superseriousbusiness/gotosocial/internal/storage"
)
// selectPruneLimit is the amount of media entries to select at a time from the db when pruning
const selectPruneLimit = 20
// UnusedLocalAttachmentCacheDays is the amount of days to keep local media in storage if it
// is not attached to a status, or was never attached to a status.
const UnusedLocalAttachmentCacheDays = 3
// Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs.
type Manager interface {
	// Stop stops the underlying worker pool of the manager. It should be called
	// when closing GoToSocial in order to cleanly finish any in-progress jobs.
	// It will block until workers are finished processing.
	Stop() error
	/*
		PROCESSING FUNCTIONS
	*/
	// ProcessMedia begins the process of decoding and storing the given data as an attachment.
	// It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting
	// the finished media, thumbnail, attachment, etc.
	//
	// data should be a function that the media manager can call to return a reader containing the media data.
	//
	// postData will be called after data has been called; it can be used to clean up any remaining resources.
	// The provided function can be nil, in which case it will not be executed.
	//
	// accountID should be the account that the media belongs to.
	//
	// ai is optional and can be nil. Any additional information about the attachment provided will be put in the database.
	ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
	// ProcessEmoji begins the process of decoding and storing the given data as an emoji.
	// It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting
	// the finished media, thumbnail, attachment, etc.
	//
	// data should be a function that the media manager can call to return a reader containing the emoji data.
	//
	// postData will be called after data has been called; it can be used to clean up any remaining resources.
	// The provided function can be nil, in which case it will not be executed.
	//
	// shortcode should be the emoji shortcode without the ':'s around it.
	//
	// id is the database ID that should be used to store the emoji.
	//
	// uri is the ActivityPub URI/ID of the emoji.
	//
	// ai is optional and can be nil. Any additional information about the emoji provided will be put in the database.
	//
	// If refresh is true, this indicates that the emoji image has changed and should be updated.
	ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error)
	// RecacheMedia refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote.
	RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error)
	/*
		PRUNING FUNCTIONS
	*/
	// PruneAllRemote prunes all remote media attachments cached on this instance which are older than the given amount of days.
	// 'Pruning' in this context means removing the locally stored data of the attachment (both thumbnail and full size),
	// and setting 'cached' to false on the associated attachment.
	//
	// The returned int is the amount of media that was pruned by this function.
	PruneAllRemote(ctx context.Context, olderThanDays int) (int, error)
	// PruneAllMeta prunes unused/out of date headers and avatars cached on this instance.
	//
	// The returned int is the amount of media that was pruned by this function.
	PruneAllMeta(ctx context.Context) (int, error)
	// PruneUnusedLocalAttachments prunes unused media attachments that were uploaded by
	// a user on this instance, but never actually attached to a status, or attached but
	// later detached.
	//
	// The returned int is the amount of media that was pruned by this function.
	PruneUnusedLocalAttachments(ctx context.Context) (int, error)
	// PruneOrphaned prunes files that exist in storage but which do not have a corresponding
	// entry in the database.
	//
	// If dry is true, then nothing will be changed, only the amount that *would* be removed
	// is returned to the caller.
	PruneOrphaned(ctx context.Context, dry bool) (int, error)
	/*
		REFETCHING FUNCTIONS
		Useful when data loss has occurred.
	*/
	// RefetchEmojis iterates through remote emojis (for the given domain, or all if domain is empty string).
	//
	// For each emoji, the manager will check whether both the full size and static images are present in storage.
	// If not, the manager will refetch and reprocess full size and static images for the emoji.
	//
	// The provided DereferenceMedia function will be used when it's necessary to refetch something this way.
	RefetchEmojis(ctx context.Context, domain string, dereferenceMedia DereferenceMedia) (int, error)
}
type manager struct {
	db           db.DB
	storage      *storage.Driver
	emojiWorker  *concurrency.WorkerPool[*ProcessingEmoji]
	mediaWorker  *concurrency.WorkerPool[*ProcessingMedia]
	stopCronJobs func() error
}
// NewManager returns a media manager with the given db and underlying storage.
//
// A worker pool will also be initialized for the manager, to ensure that only
// a limited number of media will be processed in parallel. The numbers of workers
// is determined from the $GOMAXPROCS environment variable (usually no. CPU cores).
// See internal/concurrency.NewWorkerPool() documentation for further information.
func NewManager(database db.DB, storage *storage.Driver) (Manager, error) {
	m := &manager{
		db:      database,
		storage: storage,
	}
	// Prepare the media worker pool
	m.mediaWorker = concurrency.NewWorkerPool[*ProcessingMedia](-1, 10)
	m.mediaWorker.SetProcessor(func(ctx context.Context, media *ProcessingMedia) error {
		if err := ctx.Err(); err != nil {
			return err
		}
		if _, err := media.LoadAttachment(ctx); err != nil {
			return fmt.Errorf("error loading media %s: %v", media.AttachmentID(), err)
		}
		return nil
	})
	// Prepare the emoji worker pool
	m.emojiWorker = concurrency.NewWorkerPool[*ProcessingEmoji](-1, 10)
	m.emojiWorker.SetProcessor(func(ctx context.Context, emoji *ProcessingEmoji) error {
		if err := ctx.Err(); err != nil {
			return err
		}
		if _, err := emoji.LoadEmoji(ctx); err != nil {
			return fmt.Errorf("error loading emoji %s: %v", emoji.EmojiID(), err)
		}
		return nil
	})
	// Start the worker pools
	if err := m.mediaWorker.Start(); err != nil {
		return nil, err
	}
	if err := m.emojiWorker.Start(); err != nil {
		return nil, err
	}
	if err := scheduleCleanupJobs(m); err != nil {
		return nil, err
	}
	return m, nil
}
func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
	processingMedia, err := m.preProcessMedia(ctx, data, postData, accountID, ai)
	if err != nil {
		return nil, err
	}
	m.mediaWorker.Queue(processingMedia)
	return processingMedia, nil
}
func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
	processingEmoji, err := m.preProcessEmoji(ctx, data, postData, shortcode, id, uri, ai, refresh)
	if err != nil {
		return nil, err
	}
	m.emojiWorker.Queue(processingEmoji)
	return processingEmoji, nil
}
func (m *manager) RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) {
	processingRecache, err := m.preProcessRecache(ctx, data, postData, attachmentID)
	if err != nil {
		return nil, err
	}
	m.mediaWorker.Queue(processingRecache)
	return processingRecache, nil
}
func (m *manager) Stop() error {
	// Stop media and emoji worker pools
	mediaErr := m.mediaWorker.Stop()
	emojiErr := m.emojiWorker.Stop()
	var cronErr error
	if m.stopCronJobs != nil {
		cronErr = m.stopCronJobs()
	}
	if mediaErr != nil {
		return mediaErr
	} else if emojiErr != nil {
		return emojiErr
	}
	return cronErr
}
func scheduleCleanupJobs(m *manager) error {
	// create a new cron instance for scheduling cleanup jobs
	c := cron.New(cron.WithLogger(&logrusWrapper{}))
	pruneCtx, pruneCancel := context.WithCancel(context.Background())
	if _, err := c.AddFunc("@midnight", func() {
		begin := time.Now()
		pruned, err := m.PruneAllMeta(pruneCtx)
		if err != nil {
			log.Errorf("media manager: error pruning meta: %s", err)
			return
		}
		log.Infof("media manager: pruned %d meta entries in %s", pruned, time.Since(begin))
	}); err != nil {
		pruneCancel()
		return fmt.Errorf("error starting media manager meta cleanup job: %s", err)
	}
	if _, err := c.AddFunc("@midnight", func() {
		begin := time.Now()
		pruned, err := m.PruneUnusedLocalAttachments(pruneCtx)
		if err != nil {
			log.Errorf("media manager: error pruning unused local attachments: %s", err)
			return
		}
		log.Infof("media manager: pruned %d unused local attachments in %s", pruned, time.Since(begin))
	}); err != nil {
		pruneCancel()
		return fmt.Errorf("error starting media manager unused local attachments cleanup job: %s", err)
	}
	// start remote cache cleanup cronjob if configured
	if mediaRemoteCacheDays := config.GetMediaRemoteCacheDays(); mediaRemoteCacheDays > 0 {
		if _, err := c.AddFunc("@midnight", func() {
			begin := time.Now()
			pruned, err := m.PruneAllRemote(pruneCtx, mediaRemoteCacheDays)
			if err != nil {
				log.Errorf("media manager: error pruning remote cache: %s", err)
				return
			}
			log.Infof("media manager: pruned %d remote cache entries in %s", pruned, time.Since(begin))
		}); err != nil {
			pruneCancel()
			return fmt.Errorf("error starting media manager remote cache cleanup job: %s", err)
		}
	}
	// try to stop any jobs gracefully by waiting til they're finished
	m.stopCronJobs = func() error {
		cronCtx := c.Stop()
		select {
		case <-cronCtx.Done():
			log.Infof("media manager: cron finished jobs and stopped gracefully")
		case <-time.After(1 * time.Minute):
			log.Infof("media manager: cron didn't stop after 60 seconds, will force close jobs")
			break
		}
		pruneCancel()
		return nil
	}
	c.Start()
	return nil
}
 |