# HG changeset patch # User Sunil Nimmagadda # Date 1547111408 -18030 # Node ID 7671ae88de2ab14e91e1b7d8f2061912ff30922f Initial import. diff -r 000000000000 -r 7671ae88de2a filter_rspamd.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/filter_rspamd.go Thu Jan 10 14:10:38 2019 +0500 @@ -0,0 +1,238 @@ +// Copyright (c) 2019 Sunil Nimmagadda +// +// Permission to use, copy, modify, and distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +package main + +import ( + "bufio" + "encoding/json" + "fmt" + "log" + "net/http" + "net/mail" + "os" + "strings" +) + +const rspamdURL = "http://localhost:11333/checkv2" + +var stdout *log.Logger + +type session struct { + ch <-chan string + control map[string]string + id string + payload *strings.Builder +} + +type rspamdResponse struct { + Score float32 + RequiredScore float32 `json:"required_score"` + Subject string + Action string + DKIMSig string `json:"dkim-signature"` +} + +func linkConnect(s *session, args []string) { + rdns, laddr := args[6], args[8] + s.control["Pass"] = "all" + p := strings.Split(laddr, ":") + if p[0] != "local" { + s.control["Ip"] = p[0] + } + if rdns != "" { + s.control["Hostname"] = rdns + } +} + +func linkIdentify(s *session, args []string) { + s.control["Helo"] = args[6] +} + +func txBegin(s *session, args []string) { + s.control["Queue-Id"] = args[6] +} + +func txMail(s *session, args []string) { + mail_from, status := args[7], args[8] + if status == "ok" { + s.control["From"] = mail_from + } +} + +func txRcpt(s *session, args []string) { + rcpt_to, status := args[7], args[8] + if status == "ok" { + s.control["Rcpt"] = rcpt_to + } +} + +func txData(s *session, args []string) { + status := args[7] + if status == "ok" { + s.control = nil + } +} + +func txCleanup(s *session, args []string) { + s.control = nil +} + +func filterCommit(s *session, args []string) { + token := args[5] + reason := <-s.ch + if reason != "" { + stdout.Printf("filter-result|%s|%s|reject|%s\n", + token, s.id, reason) + return + } + stdout.Printf("filter-result|%s|%s|proceed\n", token, s.id) +} + +func filterDataLine(s *session, args []string) { + token, line := args[5], args[7] + if line != "." { + s.payload.WriteString(line) + s.payload.WriteString("\n") + return + } + s.ch = dataOutput(s.control, token, s.id, s.payload.String()) +} + +func rspamdPost(hdrs map[string]string, data string) (*rspamdResponse, error) { + r := strings.NewReader(data) + client := &http.Client{} + req, err := http.NewRequest("POST", rspamdURL, r) + if err != nil { + return nil, err + } + for k, v := range hdrs { + req.Header.Add(k, v) + } + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + rr := &rspamdResponse{} + if err := json.NewDecoder(resp.Body).Decode(rr); err != nil { + return nil, err + } + return rr, nil +} + +func dataOutput(headers map[string]string, + token, id, data string) <-chan string { + ch := make(chan string) + go func() { + resp, err := rspamdPost(headers, data) + if err != nil { + log.Fatal(err) + } + log.Printf("%v\n", resp) + m, err := mail.ReadMessage(strings.NewReader(data)) + if err != nil { + log.Fatal(err) + } + rejectReason := "" + switch resp.Action { + case "add header": + m.Header["X-Spam"] = []string{"yes"} + m.Header["X-Spam-Score"] = []string{ + fmt.Sprintf("%v / %v", + resp.Score, resp.RequiredScore)} + case "rewrite subject": + m.Header["Subject"] = []string{resp.Subject} + case "reject": + rejectReason = "550 message rejected" + case "greylist": + rejectReason = "421 greylisted" + case "soft reject": + rejectReason = "451 try again later" + } + // Write DKIM-Signature header first if present + if resp.DKIMSig != "" { + stdout.Printf("filter-dataline|%s|%s|%s: %s\n", + token, id, "DKIM-Signature", resp.DKIMSig) + } + // preserve order? + for k, v := range m.Header { + stdout.Printf("filter-dataline|%s|%s|%s: %s\n", + token, id, k, strings.Join(v, ",")) + } + // Blank line seperates headers and body + stdout.Printf("filter-dataline|%s|%s|\n", token, id) + s := bufio.NewScanner(m.Body) + for s.Scan() { + stdout.Printf("filter-dataline|%s|%s|%s\n", + token, id, s.Text()) + } + stdout.Printf("filter-dataline|%s|%s|%s\n", token, id, ".") + ch <- rejectReason + }() + return ch +} + +func main() { + log.SetFlags(0) + log.SetPrefix("filter_rspamd: ") + stdout = log.New(os.Stdout, "", 0) + registry := map[string]struct { + kind string + fn func(*session, []string) + }{ + "link-connect": {"report", linkConnect}, + "link-disconnect": {"report", nil}, + "link-identify": {"report", linkIdentify}, + "tx-begin": {"report", txBegin}, + "tx-data": {"report", txData}, + "tx-mail": {"report", txMail}, + "tx-rcpt": {"report", txRcpt}, + "tx-commit": {"report", txCleanup}, + "tx-rollback": {"report", txCleanup}, + "commit": {"filter", filterCommit}, + "data-line": {"filter", filterDataLine}, + } + for k, v := range registry { + fmt.Printf("register|%s|smtp-in|%s\n", v.kind, k) + } + fmt.Println("register|ready") + sessions := map[string]*session{} + var event, id string + stdin := bufio.NewScanner(os.Stdin) + for stdin.Scan() { + fields := strings.Split(stdin.Text(), "|") + switch fields[0] { + case "report": + id = fields[5] + case "filter": + id = fields[6] + default: + log.Fatalf("Unknown kind: %s", fields[0]) + } + event = fields[4] + switch event { + case "link-disconnect": + delete(sessions, id) + case "link-connect": + sessions[id] = &session{ + control: map[string]string{}, + id: id, + payload: &strings.Builder{}} + fallthrough + default: + registry[event].fn(sessions[id], fields) + } + } +}