filter_rspamd.go
author Sunil Nimmagadda <sunil@nimmagadda.net>
Fri, 26 Jul 2019 12:15:53 +0500
changeset 4 9fc3f303d39a
parent 3 0b7061027b64
permissions -rw-r--r--
Return TempFail on errors. gilles@ pointed out that a TempFail suffices in case of HTTP POST failure to rspamd instead of bringing down the daemon.

// Copyright (c) 2019 Sunil Nimmagadda <sunil@nimmagadda.net>
//
// Permission to use, copy, modify, and distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

package main

import (
	"bufio"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"net/mail"
	"os"
	"strings"
)

const rspamdURL = "http://localhost:11333/checkv2"

var stdout *log.Logger

type session struct {
	ch      <-chan string
	control map[string]string
	id      string
	payload *strings.Builder
}

type rspamdResponse struct {
	Score         float32
	RequiredScore float32 `json:"required_score"`
	Subject       string
	Action        string
	DKIMSig       string `json:"dkim-signature"`
}

func linkConnect(s *session, args []string) {
	rdns, laddr := args[6], args[8]
	s.control["Pass"] = "all"
	p := strings.Split(laddr, ":")
	if p[0] != "local" {
		s.control["Ip"] = p[0]
	}
	if rdns != "" {
		s.control["Hostname"] = rdns
	}
}

func linkIdentify(s *session, args []string) {
	s.control["Helo"] = args[6]
}

func txBegin(s *session, args []string) {
	s.control["Queue-Id"] = args[6]
}

func txMail(s *session, args []string) {
	mailFrom, status := args[7], args[8]
	if status == "ok" {
		s.control["From"] = mailFrom
	}
}

func txRcpt(s *session, args []string) {
	rcptTo, status := args[7], args[8]
	if status == "ok" {
		s.control["Rcpt"] = rcptTo
	}
}

func txData(s *session, args []string) {
	status := args[7]
	if status == "ok" {
		s.control = nil
	}
}

func txCleanup(s *session, args []string) {
	s.control = nil
}

func filterCommit(s *session, args []string) {
	token := args[6]
	reason := <-s.ch
	if reason != "" {
		stdout.Printf("filter-result|%s|%s|reject|%s\n",
			token, s.id, reason)
		return
	}
	stdout.Printf("filter-result|%s|%s|proceed\n", token, s.id)
}

func filterDataLine(s *session, args []string) {
	token, line := args[6], args[7]
	if line != "." {
		s.payload.WriteString(line)
		s.payload.WriteString("\n")
		return
	}
	s.ch = dataOutput(s.control, token, s.id, s.payload.String())
}

func rspamdPost(hdrs map[string]string, data string) (*rspamdResponse, error) {
	r := strings.NewReader(data)
	client := &http.Client{}
	req, err := http.NewRequest("POST", rspamdURL, r)
	if err != nil {
		return nil, err
	}
	for k, v := range hdrs {
		req.Header.Add(k, v)
	}
	resp, err := client.Do(req)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	rr := &rspamdResponse{}
	if err := json.NewDecoder(resp.Body).Decode(rr); err != nil {
		return nil, err
	}
	return rr, nil
}

func dataOutput(headers map[string]string,
	token, id, data string) <-chan string {
	ch := make(chan string)
	go func() {
		resp, err := rspamdPost(headers, data)
		if err != nil {
			ch <- "421 Temporary failure"
			return
		}
		log.Printf("%v\n", resp)
		m, err := mail.ReadMessage(strings.NewReader(data))
		if err != nil {
			ch <- "421 Temporary failure"
			return
		}
		rejectReason := ""
		switch resp.Action {
		case "add header":
			m.Header["X-Spam"] = []string{"yes"}
			m.Header["X-Spam-Score"] = []string{
				fmt.Sprintf("%v / %v",
					resp.Score, resp.RequiredScore)}
		case "rewrite subject":
			m.Header["Subject"] = []string{resp.Subject}
		case "reject":
			rejectReason = "550 message rejected"
		case "greylist":
			rejectReason = "421 greylisted"
		case "soft reject":
			rejectReason = "451 try again later"
		}
		// Write DKIM-Signature header first if present
		if resp.DKIMSig != "" {
			stdout.Printf("filter-dataline|%s|%s|%s: %s\n",
				token, id, "DKIM-Signature", resp.DKIMSig)
		}
		// preserve order?
		for k, v := range m.Header {
			stdout.Printf("filter-dataline|%s|%s|%s: %s\n",
				token, id, k, strings.Join(v, ","))
		}
		// Blank line seperates headers and body
		stdout.Printf("filter-dataline|%s|%s|\n", token, id)
		s := bufio.NewScanner(m.Body)
		for s.Scan() {
			stdout.Printf("filter-dataline|%s|%s|%s\n",
				token, id, s.Text())
		}
		stdout.Printf("filter-dataline|%s|%s|%s\n", token, id, ".")
		ch <- rejectReason
	}()
	return ch
}

func main() {
	log.SetFlags(0)
	log.SetPrefix("filter_rspamd: ")
	stdout = log.New(os.Stdout, "", 0)
	registry := map[string]struct {
		kind string
		fn   func(*session, []string)
	}{
		"link-connect":    {"report", linkConnect},
		"link-disconnect": {"report", nil},
		"link-identify":   {"report", linkIdentify},
		"tx-begin":        {"report", txBegin},
		"tx-data":         {"report", txData},
		"tx-mail":         {"report", txMail},
		"tx-rcpt":         {"report", txRcpt},
		"tx-commit":       {"report", txCleanup},
		"tx-rollback":     {"report", txCleanup},
		"commit":          {"filter", filterCommit},
		"data-line":       {"filter", filterDataLine},
	}
	for k, v := range registry {
		fmt.Printf("register|%s|smtp-in|%s\n", v.kind, k)
	}
	fmt.Println("register|ready")
	sessions := map[string]*session{}
	var event, id string
	stdin := bufio.NewScanner(os.Stdin)
	for stdin.Scan() {
		fields := strings.Split(stdin.Text(), "|")
		event, id = fields[4], fields[5]
		switch event {
		case "link-disconnect":
			delete(sessions, id)
		case "link-connect":
			sessions[id] = &session{
				control: map[string]string{},
				id:      id,
				payload: &strings.Builder{}}
			fallthrough
		default:
			if sessions[id] != nil {
				registry[event].fn(sessions[id], fields)
			}
		}
	}
}