commit 6df18eb79405491b52a0ac1fe68035ed83531b7a Author: Tordarus Date: Fri Jun 6 16:47:33 2025 +0200 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..07973a1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +organizer diff --git a/bytechan_writer.go b/bytechan_writer.go new file mode 100644 index 0000000..d61edc5 --- /dev/null +++ b/bytechan_writer.go @@ -0,0 +1,22 @@ +package main + +import ( + "io" +) + +func NewWriterFromByteChan(ch chan byte) *ByteChanWriter { + return &ByteChanWriter{ch} +} + +type ByteChanWriter struct { + ch chan byte +} + +var _ io.Writer = &ByteChanWriter{} + +func (w *ByteChanWriter) Write(p []byte) (n int, err error) { + for _, b := range p { + w.ch <- b + } + return len(p), nil +} diff --git a/encode_video.go b/encode_video.go new file mode 100644 index 0000000..f5973ec --- /dev/null +++ b/encode_video.go @@ -0,0 +1,43 @@ +package main + +import ( + "fmt" + "io" + "os" + "os/exec" + "strings" + "time" +) + +func EncodeVideo(w io.Writer, encArgs, oldFile, newFile string) error { + if encArgs == "" { + fmt.Fprintf(w, "\trename file\n\t from: '%s'\n\t to: '%s'\n", oldFile, newFile) + return os.Rename(oldFile, newFile) + } + + start := time.Now() + + fmt.Fprintf(w, "\tencode file\n\t from: '%s'\n\t to: '%s'\n", oldFile, newFile) + + fullArgs := []string{"-y", "-i", oldFile} + fullArgs = append(fullArgs, strings.Split(encArgs, " ")...) + fullArgs = append(fullArgs, newFile) + + cmd := exec.Command("ffmpeg", fullArgs...) + + if err := cmd.Start(); err != nil { + return err + } + + if err := cmd.Wait(); err != nil { + return err + } + + if err := os.Remove(oldFile); err != nil { + return err + } + + fmt.Fprintf(w, "\t done (took %s)\n", time.Since(start).Truncate(100*time.Millisecond)) + + return nil +} diff --git a/envvars.go b/envvars.go new file mode 100644 index 0000000..4f4001c --- /dev/null +++ b/envvars.go @@ -0,0 +1,52 @@ +package main + +import ( + "os/user" + "runtime" + "strconv" + "text/template" + + "git.tordarus.net/nyaanime/logic" + + "git.tordarus.net/tordarus/envvars" +) + +var ( + DownloadPath = envvars.String("DOWNLOAD_PATH", "") + ThreadCount = envvars.Int("THREADS", runtime.NumCPU()) + DeleteLowPriorityFiles = envvars.Bool("DELETE_LOW_PRIORITY_FILES", false) + + TelegramBotToken = envvars.String("TELEGRAM_API_TOKEN", "") + TelegramChatID = envvars.Int64("TELEGRAM_CHAT_ID", 0) + TelegramOrganizeMessagePatternStr = logic.EscSeqReplacer.Replace(envvars.String("TELEGRAM_ORGANIZE_MESSAGE_PATTERN", `Recently Downloaded Animes ({{len .}} eps){{range .}}\n{{.Anime.Title.UserPreferred}} Episode {{.Episode}}{{end}}`)) + TelegramOrganizeMessagePattern = template.Must(template.New("TELEGRAM_ORGANIZE_MESSAGE_PATTERN").Parse(TelegramOrganizeMessagePatternStr)) + TelegramOrganizeMessageSendCondition = envvars.ObjectSlice("TELEGRAM_ORGANIZE_MESSAGE_SEND_CONDITION", ",", []SendCondition{SendConditionAlways}, SendConditionFromString) + TelegramOrganizeMessageSendInterval = envvars.Duration("TELEGRAM_ORGANIZE_MESSAGE_SEND_INTERVAL", 0) + TelegramOrganizeMessageSendOffset = envvars.Duration("TELEGRAM_ORGANIZE_MESSAGE_SEND_OFFSET", 0) + + Uid = envvars.Object("UID", 1000, func(s string) (int, error) { + if uid, err := strconv.Atoi(s); err == nil { + return uid, nil + } + + usr, err := user.Lookup(s) + if err != nil { + return 0, err + } + + return strconv.Atoi(usr.Uid) + }) + + Gid = envvars.Object("GID", 1000, func(s string) (int, error) { + if gid, err := strconv.Atoi(s); err == nil { + return gid, nil + } + + grp, err := user.LookupGroup(s) + if err != nil { + return 0, err + } + + return strconv.Atoi(grp.Gid) + }) +) diff --git a/file_priority_string.go b/file_priority_string.go new file mode 100644 index 0000000..f57fcc5 --- /dev/null +++ b/file_priority_string.go @@ -0,0 +1,17 @@ +package main + +import ( + "fmt" + "strings" + + "git.tordarus.net/nyaanime/logic" +) + +func FilePrio2Str(fp *logic.FilePriority) string { + return fmt.Sprintf("priority: %d | resolution: %s | languages: %s | subtitles: %s", + fp.Priority, + fp.Properties.GetResolution().String(), + strings.Join(fp.Properties.GetLanguages(), ", "), + strings.Join(fp.Properties.GetSubtitles(), ", "), + ) +} diff --git a/filehandle.go b/filehandle.go new file mode 100644 index 0000000..f5d0475 --- /dev/null +++ b/filehandle.go @@ -0,0 +1,25 @@ +package main + +import ( + "io" +) + +type FileHandle struct { + File string + Chan chan byte + Writer io.Writer +} + +func (fh *FileHandle) Close() error { + close(fh.Chan) + return nil +} + +func NewFileHandle(path string) *FileHandle { + out := make(chan byte, 4096) + return &FileHandle{ + File: path, + Chan: out, + Writer: NewWriterFromByteChan(out), + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..caad0c1 --- /dev/null +++ b/go.mod @@ -0,0 +1,31 @@ +module git.tordarus.net/nyaanime/organizer + +go 1.23 + +toolchain go1.24.3 + +require ( + git.tordarus.net/nyaanime/logic v0.0.1 + git.tordarus.net/nyaanime/model v0.0.1 + git.tordarus.net/nyaanime/parsers v0.0.2 + git.tordarus.net/tordarus/adverr/v2 v2.0.2 + git.tordarus.net/tordarus/anilist v1.5.2 + git.tordarus.net/tordarus/channel v0.1.19 + git.tordarus.net/tordarus/envvars v0.0.0-20250114175450-d73e12b838a5 + git.tordarus.net/tordarus/gmath v0.0.7 + git.tordarus.net/tordarus/slices v0.0.14 + github.com/fatih/color v1.18.0 + github.com/fsnotify/fsnotify v1.9.0 + github.com/go-telegram-bot-api/telegram-bot-api v4.6.4+incompatible +) + +require ( + git.tordarus.net/tordarus/tprint v0.0.1 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/rivo/uniseg v0.2.0 // indirect + github.com/technoweenie/multipartstreamer v1.0.1 // indirect + golang.org/x/sys v0.25.0 // indirect + gopkg.in/vansante/go-ffprobe.v2 v2.2.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..06cf520 --- /dev/null +++ b/go.sum @@ -0,0 +1,43 @@ +git.tordarus.net/nyaanime/logic v0.0.1 h1:fUa/O9/WgzJhmGFGSuy6gx+Rr2Y+/RLee3qaNO0l1lI= +git.tordarus.net/nyaanime/logic v0.0.1/go.mod h1:+4sKxSzwgCoeZv3lyWB3yFa5CVyi/frmwsTqDf8C3mE= +git.tordarus.net/nyaanime/model v0.0.1 h1:/I+87Z6eEw/o2adltKnCk4FZai2mPekjYlzEjY1ppyQ= +git.tordarus.net/nyaanime/model v0.0.1/go.mod h1:oHV82UMNy4XgPHkI6tZiwabdi6myqHXgjMi9sNZ+rG4= +git.tordarus.net/nyaanime/parsers v0.0.2 h1:UxfDGxgS2guldLhRtlLNjst/UyeA8OC44oj7nUeRUB8= +git.tordarus.net/nyaanime/parsers v0.0.2/go.mod h1:sx8HyJCpG7zzwRAEE1ZlmVPirPc3fdArlCM5L1sxEaQ= +git.tordarus.net/tordarus/adverr/v2 v2.0.2 h1:7nvNjMMjtGPq0EY6duMiv+seJ7MacNvKSBmckHl6Erg= +git.tordarus.net/tordarus/adverr/v2 v2.0.2/go.mod h1:gCC46KsWosZJh7MVNDEU99hKQoxEWZgHITDHtmFwwiQ= +git.tordarus.net/tordarus/anilist v1.5.2 h1:SxlovS+e3lgL2SowQQwj8dQrIZzRFPomcGCw3V+My0Q= +git.tordarus.net/tordarus/anilist v1.5.2/go.mod h1:Mrhx/9+8HJVj5ebQ5fJuXqL220tEJhgQIqFK2WKPXgA= +git.tordarus.net/tordarus/channel v0.1.19 h1:d9xnSwFyvBh4B1/82mt0A7Gpm2nIZJTc+9ceJMIOu5Q= +git.tordarus.net/tordarus/channel v0.1.19/go.mod h1:8/dWFTdGO7g4AeSZ7cF6GerkGbe9c4dBVMVDBxOd9m4= +git.tordarus.net/tordarus/envvars v0.0.0-20250114175450-d73e12b838a5 h1:rKNDX/YGunqg8TEU6q1rgS2BcDKVmUW2cg61JOE/wws= +git.tordarus.net/tordarus/envvars v0.0.0-20250114175450-d73e12b838a5/go.mod h1:/qVGwrEmqtIrZyuuoIQl4vquSkPWUNJmlGNedDrdYfg= +git.tordarus.net/tordarus/gmath v0.0.7 h1:tR48idt9AUL0r556ww3ZxByTKJEr6NWCTlhl2ihzYxQ= +git.tordarus.net/tordarus/gmath v0.0.7/go.mod h1:mO7aPlvNrGVE9UFXEuuACjZgMDsM63l3OcQy6xSQnoE= +git.tordarus.net/tordarus/slices v0.0.14 h1:Jy1VRMs777WewJ7mxTgjyQIMm/Zr+co18/XoQ01YZ3A= +git.tordarus.net/tordarus/slices v0.0.14/go.mod h1:RgE7A1aSAezIvPUgcbUuMHu0q4xGKoRevT+DC0eJmwI= +git.tordarus.net/tordarus/tprint v0.0.1 h1:aM5c0nLwicUIoic/xguwE5fQdQ2bB3z0+FQEN/Yt0H4= +git.tordarus.net/tordarus/tprint v0.0.1/go.mod h1:2UdHVY/ue8vXeJU/IJY1xBikDaH35kaMzxjk9ryKB8Q= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-telegram-bot-api/telegram-bot-api v4.6.4+incompatible h1:2cauKuaELYAEARXRkq2LrJ0yDDv1rW7+wrTEdVL3uaU= +github.com/go-telegram-bot-api/telegram-bot-api v4.6.4+incompatible/go.mod h1:qf9acutJ8cwBUhm1bqgz6Bei9/C/c93FPDljKWwsOgM= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/technoweenie/multipartstreamer v1.0.1 h1:XRztA5MXiR1TIRHxH2uNxXxaIkKQDeX7m2XsSOlQEnM= +github.com/technoweenie/multipartstreamer v1.0.1/go.mod h1:jNVxdtShOxzAsukZwTSw6MDx5eUJoiEBsSvzDU9uzog= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/vansante/go-ffprobe.v2 v2.2.1 h1:sFV08OT1eZ1yroLCZVClIVd9YySgCh9eGjBWO0oRayI= +gopkg.in/vansante/go-ffprobe.v2 v2.2.1/go.mod h1:qF0AlAjk7Nqzqf3y333Ly+KxN3cKF2JqA3JT5ZheUGE= diff --git a/handle_file.go b/handle_file.go new file mode 100644 index 0000000..c41a953 --- /dev/null +++ b/handle_file.go @@ -0,0 +1,168 @@ +package main + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "time" + + "git.tordarus.net/nyaanime/logic" + "git.tordarus.net/nyaanime/model" + "git.tordarus.net/nyaanime/parsers" + "git.tordarus.net/tordarus/adverr/v2" + "github.com/fatih/color" +) + +func HandleFile(fh *FileHandle) { + defer fh.Close() + path := fh.File + w := fh.Writer + + fmt.Fprint(w, color.MagentaString("%s file found: %s\n", time.Now().Format("2006-01-02 15:04:05"), path)) + + fmt.Fprint(w, "\ttry parsers: ") + for i, parser := range parsers.Parsers { + parsedFile, ok := parser.FileParser(&parser, path) + if !ok { + fmt.Fprint(w, color.YellowString(parser.Identity)) + if i < len(parsers.Parsers)-1 { + fmt.Fprint(w, ", ") + } else { + fmt.Fprintln(w, color.RedString("\n\tfile ignored")) + } + continue + } + + anime, err := logic.SearchAnimeByTitle(parsedFile.OriginalAnimeTitle) + if err != nil { + fmt.Fprintln(w, color.RedString(parser.Identity)) + fmt.Fprintln(w, color.RedString("\tanime not found: '%s'", parsedFile.OriginalAnimeTitle)) + break + } + + fmt.Fprintln(w, color.GreenString(parser.Identity)) + + parsedFile.Anime = anime + HandleParsedFile(w, parsedFile) + break + } +} + +func HandleParsedFile(w io.Writer, parsedFile *model.ParsedFile) { + newFilePrio := logic.NewFilePriority(parsedFile) + oldFilePrio, animeEpNotExistLocally := logic.GetAnimeEpProps(parsedFile.AnimeEpisode()) + + // debug output + if animeEpNotExistLocally { + fmt.Fprintln(w, "\tfile exists locally") + fmt.Fprintf(w, "\t local file: %s\n", FilePrio2Str(oldFilePrio)) + fmt.Fprintf(w, "\t new file: %s\n", FilePrio2Str(newFilePrio)) + if newFilePrio.Priority > oldFilePrio.Priority { + fmt.Fprint(w, color.GreenString("\t overwrite local file\n")) + } else if !DeleteLowPriorityFiles { + fmt.Fprint(w, color.YellowString("\t ignore new file\n")) + } + } + + // delete files with lower priority from DownloadPath + if DeleteLowPriorityFiles && animeEpNotExistLocally && oldFilePrio.Priority >= newFilePrio.Priority { + fmt.Fprint(w, color.YellowString("\tdelete file with lower priority '%s'", parsedFile.File)) + err := os.Remove(parsedFile.File) + if err != nil { + fmt.Fprint(w, color.RedString(" failed: '%s'\n", err.Error())) + } else { + fmt.Fprint(w, color.GreenString(" done\n")) + } + return + } + + if !animeEpNotExistLocally || newFilePrio.Priority > oldFilePrio.Priority { + DeleteOldAnimeEpisode(parsedFile.AnimeEpisode()) + if err := OrganizeAnimeEpisode(w, parsedFile); err != nil { + fmt.Fprint(w, color.RedString("\terror: %s\n", err.Error())) + } + } +} + +func OrganizeAnimeEpisode(w io.Writer, parsedFile *model.ParsedFile) error { + PrepareTelegramAnimeEpMessage(parsedFile.AnimeEpisode()) + + start := time.Now() + + oldFile := parsedFile.File + newFile := logic.GetAnimeEpFilepath(parsedFile.AnimeEpisode(), "part") + encodedFile := logic.GetAnimeEpFilepath(parsedFile.AnimeEpisode(), filepath.Ext(parsedFile.File)) + lockFile := logic.GetAnimeEpFilepath(parsedFile.AnimeEpisode(), "lock") + + fmt.Fprintf(w, "\tmove file\n\t from: '%s'\n\t to: '%s'\n", oldFile, newFile) + + if err := os.MkdirAll(filepath.Dir(newFile), os.ModePerm); err != nil { + return err + } + + if err := os.Chown(filepath.Dir(newFile), Uid, Gid); err != nil { + return err + } + + if _, err := os.Stat(newFile); err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + + inputFile, err := os.Open(oldFile) + if err != nil { + return err + } + defer inputFile.Close() + + outputFile, err := os.Create(newFile) + if err != nil { + return err + } + defer outputFile.Close() + + if err := os.Chown(newFile, Uid, Gid); err != nil { + return err + } + + written, err := io.Copy(outputFile, inputFile) + if err != nil { + return err + } + + fmt.Fprintf(w, "\t done (copied %s in %s)\n", FormatBytes(written), time.Since(start).Truncate(100*time.Millisecond)) + + // TODO Video encoding should be done before comparing priorities. merge in downloader repo? + if err := EncodeVideo(w, parsedFile.Parser.FileEncoding, newFile, encodedFile); err != nil { + return err + } + + if err := os.Remove(oldFile); err != nil { + return err + } + + if err := os.Remove(lockFile); err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + + return nil +} + +func DeleteOldAnimeEpisode(animeEp model.AnimeEpisode) { + animeEpPath := logic.GetAnimeEpFilepath(animeEp, "*") + files, err := filepath.Glob(animeEpPath) + if err != nil { + panic(logic.ErrInvalidGlobSyntax.Wrap(err, animeEpPath)) + } + + for _, file := range files { + if filepath.Ext(file) == ".lock" { + continue + } + + if err := os.Remove(file); err != nil { + adverr.Println(err) + } + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..3dd3359 --- /dev/null +++ b/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "context" + "os" + "os/exec" + "os/signal" + "sync" + "syscall" + + "git.tordarus.net/nyaanime/logic" + "git.tordarus.net/tordarus/channel" + "github.com/fsnotify/fsnotify" +) + +var ( + // AppCtx notifies all threads to finish their work and shutdown + AppCtx, cancelAppCtx = context.WithCancel(context.Background()) + // AppExitWg is waiting for all threads until they are done + AppExitWg = &sync.WaitGroup{} + + Runner = InitializeRunner() +) + +func main() { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT) + go func() { + <-signalChan + exit() + }() + + // check for ffprobe in PATH + if _, err := exec.LookPath("ffprobe"); err != nil { + panic(err) // TODO error handling + } + + // get access token once at startup to be sure that an access token is obtainable at all + if _, err := logic.GetAnilistAccessToken(); err != nil { + panic(err) // TODO error handling + } + + if err := InitTelegramBot(); err != nil { + panic(err) // TODO error handling + } + + logic.PrintPriorityTables() + + fileChan, err := WatchDirectory(fsnotify.Create, DownloadPath) + if err != nil { + panic(err) // TODO error handling + } + + fileHandleChan := channel.Map(fileChan, NewFileHandle) + workChan, logChan := channel.Tee(fileHandleChan) + + AppExitWg.Add(1) + go func() { + defer AppExitWg.Done() + channel.Each(workChan, HandleFileInRunner) + }() + + channel.Each(logChan, PrintFileHandle) +} + +func HandleFileInRunner(fh *FileHandle) { + Runner.Run(func() { HandleFile(fh) }) +} + +func exit() { + cancelAppCtx() // notify all threads to shutdown + AppExitWg.Wait() // wait for threads until shutdown +} diff --git a/send_condition.go b/send_condition.go new file mode 100644 index 0000000..3632c5c --- /dev/null +++ b/send_condition.go @@ -0,0 +1,45 @@ +package main + +import ( + "errors" + "strings" + + "git.tordarus.net/nyaanime/model" + "git.tordarus.net/tordarus/anilist" +) + +type SendCondition string + +var AllSendConditions = []SendCondition{SendConditionOnList, SendConditionAlways, SendConditionPlanned, SendConditionNotWatched} + +const ( + SendConditionOnList = "ON_LIST" + SendConditionAlways = "ALWAYS" + SendConditionPlanned = "PLANNED" + SendConditionNotWatched = "NOT_WATCHED" +) + +func SendConditionFromString(str string) (SendCondition, error) { + str = strings.ToLower(str) + for _, condition := range AllSendConditions { + if str == strings.ToLower(string(condition)) { + return condition, nil + } + } + return SendCondition(""), errors.New("invalid message send condition") +} + +func (c SendCondition) ShouldSend(animeEp model.AnimeEpisode, listEntry *anilist.MediaList) bool { + switch c { + case SendConditionOnList: + return listEntry != nil + case SendConditionPlanned: + return listEntry != nil && listEntry.Status == anilist.MediaListStatusPlanning + case SendConditionNotWatched: + return listEntry != nil && listEntry.Progress < animeEp.Episode + case SendConditionAlways: + return true + default: + panic("invalid telegram message send condition") + } +} diff --git a/telegram.go b/telegram.go new file mode 100644 index 0000000..346e206 --- /dev/null +++ b/telegram.go @@ -0,0 +1,165 @@ +package main + +import ( + "context" + "strings" + + "git.tordarus.net/nyaanime/logic" + "git.tordarus.net/nyaanime/model" + "git.tordarus.net/tordarus/adverr/v2" + "git.tordarus.net/tordarus/anilist" + "git.tordarus.net/tordarus/channel" + "git.tordarus.net/tordarus/slices" + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api" +) + +var TelegramBot *tgbotapi.BotAPI +var AnimeEpisodeMessageChannel = make(chan model.AnimeEpisode, 1000) + +func InitTelegramBot() error { + if TelegramBotToken != "" && TelegramChatID != 0 { + bot, err := tgbotapi.NewBotAPI(TelegramBotToken) + if err != nil { + return err + } + TelegramBot = bot + } + + // close channel on app shutdown + go func() { + <-AppCtx.Done() + close(AnimeEpisodeMessageChannel) + }() + + AppExitWg.Add(1) + go SendMessagePeriodically() + + return nil +} + +func SendMessagePeriodically() { + defer AppExitWg.Done() + + var messagesPerInterval <-chan []model.AnimeEpisode + + if TelegramOrganizeMessageSendInterval > 0 { + WaitForNextTelegramSendCycle() + sendAllQueuedAnimeEpisodes() + + grouperFunc := func(current []model.AnimeEpisode, value model.AnimeEpisode) []model.AnimeEpisode { + return append(current, value) + } + messagesPerInterval = channel.GroupByTime(AnimeEpisodeMessageChannel, TelegramOrganizeMessageSendInterval, grouperFunc) + } else { + mapperFunc := func(value model.AnimeEpisode) []model.AnimeEpisode { + return []model.AnimeEpisode{value} + } + messagesPerInterval = channel.MapSuccessive(AnimeEpisodeMessageChannel, mapperFunc) + } + + for animeEpisodes := range messagesPerInterval { + sendTelegramAnimeEpMessage(animeEpisodes) + } +} + +func sendAllQueuedAnimeEpisodes() { + queuedEpisodeAmount := len(AnimeEpisodeMessageChannel) + animeEpisodes := make([]model.AnimeEpisode, 0, queuedEpisodeAmount) + for i := 0; i < queuedEpisodeAmount; i++ { + animeEpisodes = append(animeEpisodes, <-AnimeEpisodeMessageChannel) + } + sendTelegramAnimeEpMessage(animeEpisodes) +} + +func SendTelegramMessage(text string) { + if TelegramBot == nil || strings.TrimSpace(text) == "" { + return + } + + msg := tgbotapi.NewMessage(TelegramChatID, text) + msg.ParseMode = "html" + _, err := TelegramBot.Send(msg) + if err != nil { + adverr.Println(adverr.Wrap("could not send telegram message", err)) + } +} + +func PrepareTelegramAnimeEpMessage(animeEp model.AnimeEpisode) { + shouldSendMessage, err := CheckSendConditions(animeEp) + if err != nil { + adverr.Println(adverr.Wrap("could not check telegram message send conditions", err)) + } + + if !shouldSendMessage { + return + } + + AnimeEpisodeMessageChannel <- animeEp +} + +func sendTelegramAnimeEpMessage(animeEpisodes []model.AnimeEpisode) { + if len(animeEpisodes) == 0 { + return + } + + animeEpisodes = RemoveDuplicates(animeEpisodes) + + b := new(strings.Builder) + if err := TelegramOrganizeMessagePattern.Execute(b, animeEpisodes); err != nil { + adverr.Println(adverr.Wrap("could not send telegram message", err)) + } + + SendTelegramMessage(b.String()) +} + +func RemoveDuplicates(animeEpisodes []model.AnimeEpisode) []model.AnimeEpisode { + mapperFunc := func(animeEp model.AnimeEpisode) (model.Pair[anilist.MediaID, int], model.AnimeEpisode) { + return model.Pair[anilist.MediaID, int]{First: animeEp.Anime.ID, Second: animeEp.Episode}, animeEp + } + + unmapperFunc := func(key model.Pair[anilist.MediaID, int], value model.AnimeEpisode) model.AnimeEpisode { + return value + } + + return slices.OfMap(slices.ToMap(animeEpisodes, mapperFunc), unmapperFunc) +} + +func CheckSendConditions(animeEp model.AnimeEpisode) (bool, error) { + listEntry, err := GetListEntry(animeEp.Anime) + if err != nil { + return false, err + } + + for _, sendCondition := range AllSendConditions { + // check if user configured current sendCondition + if !slices.Contains(TelegramOrganizeMessageSendCondition, sendCondition) { + continue + } + + // check if current sendCondition applies for given anime episode + if sendCondition.ShouldSend(animeEp, listEntry) { + return true, nil + } + } + + return false, nil +} + +func GetListEntry(anime *anilist.Media) (*anilist.MediaList, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + listEntries, err := logic.GetCurrentlyWatchingAnimesContext(ctx, logic.AllMediaListStatuses...) + if err != nil { + return nil, err + } + + filteredListEntries := channel.Filter(listEntries, func(a *anilist.MediaList) bool { return a.MediaID == anime.ID }) + listEntry := channel.FindFirstAndCancelFlush(filteredListEntries, cancel) // TODO flush working properly? + + if listEntry == nil { + return nil, nil + } + + return *listEntry, nil +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..65a2f2a --- /dev/null +++ b/utils.go @@ -0,0 +1,65 @@ +package main + +import ( + "fmt" + "io" + "os" + "time" + + "git.tordarus.net/tordarus/channel" + "git.tordarus.net/tordarus/gmath" +) + +func InitializeRunner() channel.Runner { + if ThreadCount <= 0 { + return channel.NewUnlimitedRunner() + } + return channel.NewLimitedRunner(ThreadCount) +} + +func FormatBytes[T gmath.Integer](bytes T) string { + value := float64(bytes) + + if value >= 1000000000000 { + return fmt.Sprintf("%.02fT", value/1000000000000) + } else if value >= 1000000000 { + return fmt.Sprintf("%.02fG", value/1000000000) + } else if value >= 1000000 { + return fmt.Sprintf("%.02fM", value/1000000) + } else if value >= 1000 { + return fmt.Sprintf("%.02fK", value/1000) + } else { + return fmt.Sprintf("%.02fB", value) + } +} + +func PrintByteChan(w io.Writer, ch <-chan byte) error { + for b := range ch { + if _, err := w.Write([]byte{b}); err != nil { + return err + } + } + + fmt.Fprintln(w) + return nil +} + +func PrintFileHandle(fh *FileHandle) { + if err := PrintByteChan(os.Stdout, fh.Chan); err != nil { + panic(err) // TODO error handling + } +} + +func WaitForNextTelegramSendCycle() { + now := time.Now() + _, offset := now.Zone() + offsetDuration := time.Duration(offset) * time.Second + lastCycle := now.Truncate(TelegramOrganizeMessageSendInterval).Add(-offsetDuration).Add(TelegramOrganizeMessageSendOffset) + + if durationUntilLastCycle := time.Until(lastCycle); durationUntilLastCycle > 0 { + time.Sleep(durationUntilLastCycle) + } else { + nextCycle := lastCycle.Add(TelegramOrganizeMessageSendInterval) + time.Sleep(time.Until(nextCycle)) + } +} diff --git a/watch_directory.go b/watch_directory.go new file mode 100644 index 0000000..671b9d7 --- /dev/null +++ b/watch_directory.go @@ -0,0 +1,95 @@ +package main + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/fsnotify/fsnotify" +) + +func WatchDirectory(op fsnotify.Op, path string) (<-chan string, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + panic(err) // TODO error handling + } + + err = watcher.Add(path) + if err != nil { + return nil, err + } + + files, err := os.ReadDir(path) + if err != nil { + return nil, err + } + + ch := make(chan string, 10) + + // close channel on app shutdown + go func() { + <-AppCtx.Done() + watcher.Close() + }() + + AppExitWg.Add(1) + go func(watcher *fsnotify.Watcher, ch chan<- string) { + defer AppExitWg.Done() + defer watcher.Close() + + for _, file := range files { + path := filepath.Join(path, file.Name()) + if file.IsDir() { + //fmt.Println("watching directory", path) + watcher.Add(path) + + if dirFiles, err := os.ReadDir(path); err == nil { + for _, dirFile := range dirFiles { + ch <- filepath.Join(path, dirFile.Name()) + } + } + } else { + ch <- path + } + } + + for { + select { + case event, ok := <-watcher.Events: + if !ok { + close(ch) + return + } + + if fi, err := os.Stat(event.Name); err == nil && fi.IsDir() { + if event.Op&fsnotify.Create == fsnotify.Create { + //fmt.Println("watching directory", event.Name) + watcher.Add(event.Name) + } + + // read dir immediately because directory files could simultanously with its parent directory + if dirFiles, err := os.ReadDir(event.Name); err == nil { + for _, dirFile := range dirFiles { + ch <- filepath.Join(event.Name, dirFile.Name()) + } + } + } else if err != nil && event.Op&fsnotify.Remove == fsnotify.Remove { + //fmt.Println("stopped watching directory", event.Name) + watcher.Remove(event.Name) + } + + if event.Op&op == op { + ch <- event.Name + } + case err, ok := <-watcher.Errors: + if ok { + fmt.Println(err, ok) + } + close(ch) + return + } + } + }(watcher, ch) + + return ch, nil +}