filter_rspamd.go
changeset 0 7671ae88de2a
child 1 8a09170cd1e0
equal deleted inserted replaced
-1:000000000000 0:7671ae88de2a
       
     1 // Copyright (c) 2019 Sunil Nimmagadda <sunil@nimmagadda.net>
       
     2 //
       
     3 // Permission to use, copy, modify, and distribute this software for any
       
     4 // purpose with or without fee is hereby granted, provided that the above
       
     5 // copyright notice and this permission notice appear in all copies.
       
     6 //
       
     7 // THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
       
     8 // WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
       
     9 // MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
       
    10 // ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
       
    11 // WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
       
    12 // ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
       
    13 // OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
       
    14 
       
    15 package main
       
    16 
       
    17 import (
       
    18 	"bufio"
       
    19 	"encoding/json"
       
    20 	"fmt"
       
    21 	"log"
       
    22 	"net/http"
       
    23 	"net/mail"
       
    24 	"os"
       
    25 	"strings"
       
    26 )
       
    27 
       
    28 const rspamdURL = "http://localhost:11333/checkv2"
       
    29 
       
    30 var stdout *log.Logger
       
    31 
       
    32 type session struct {
       
    33 	ch      <-chan string
       
    34 	control map[string]string
       
    35 	id      string
       
    36 	payload *strings.Builder
       
    37 }
       
    38 
       
    39 type rspamdResponse struct {
       
    40 	Score         float32
       
    41 	RequiredScore float32 `json:"required_score"`
       
    42 	Subject       string
       
    43 	Action        string
       
    44 	DKIMSig       string `json:"dkim-signature"`
       
    45 }
       
    46 
       
    47 func linkConnect(s *session, args []string) {
       
    48 	rdns, laddr := args[6], args[8]
       
    49 	s.control["Pass"] = "all"
       
    50 	p := strings.Split(laddr, ":")
       
    51 	if p[0] != "local" {
       
    52 		s.control["Ip"] = p[0]
       
    53 	}
       
    54 	if rdns != "" {
       
    55 		s.control["Hostname"] = rdns
       
    56 	}
       
    57 }
       
    58 
       
    59 func linkIdentify(s *session, args []string) {
       
    60 	s.control["Helo"] = args[6]
       
    61 }
       
    62 
       
    63 func txBegin(s *session, args []string) {
       
    64 	s.control["Queue-Id"] = args[6]
       
    65 }
       
    66 
       
    67 func txMail(s *session, args []string) {
       
    68 	mail_from, status := args[7], args[8]
       
    69 	if status == "ok" {
       
    70 		s.control["From"] = mail_from
       
    71 	}
       
    72 }
       
    73 
       
    74 func txRcpt(s *session, args []string) {
       
    75 	rcpt_to, status := args[7], args[8]
       
    76 	if status == "ok" {
       
    77 		s.control["Rcpt"] = rcpt_to
       
    78 	}
       
    79 }
       
    80 
       
    81 func txData(s *session, args []string) {
       
    82 	status := args[7]
       
    83 	if status == "ok" {
       
    84 		s.control = nil
       
    85 	}
       
    86 }
       
    87 
       
    88 func txCleanup(s *session, args []string) {
       
    89 	s.control = nil
       
    90 }
       
    91 
       
    92 func filterCommit(s *session, args []string) {
       
    93 	token := args[5]
       
    94 	reason := <-s.ch
       
    95 	if reason != "" {
       
    96 		stdout.Printf("filter-result|%s|%s|reject|%s\n",
       
    97 			token, s.id, reason)
       
    98 		return
       
    99 	}
       
   100 	stdout.Printf("filter-result|%s|%s|proceed\n", token, s.id)
       
   101 }
       
   102 
       
   103 func filterDataLine(s *session, args []string) {
       
   104 	token, line := args[5], args[7]
       
   105 	if line != "." {
       
   106 		s.payload.WriteString(line)
       
   107 		s.payload.WriteString("\n")
       
   108 		return
       
   109 	}
       
   110 	s.ch = dataOutput(s.control, token, s.id, s.payload.String())
       
   111 }
       
   112 
       
   113 func rspamdPost(hdrs map[string]string, data string) (*rspamdResponse, error) {
       
   114 	r := strings.NewReader(data)
       
   115 	client := &http.Client{}
       
   116 	req, err := http.NewRequest("POST", rspamdURL, r)
       
   117 	if err != nil {
       
   118 		return nil, err
       
   119 	}
       
   120 	for k, v := range hdrs {
       
   121 		req.Header.Add(k, v)
       
   122 	}
       
   123 	resp, err := client.Do(req)
       
   124 	if err != nil {
       
   125 		return nil, err
       
   126 	}
       
   127 	defer resp.Body.Close()
       
   128 	rr := &rspamdResponse{}
       
   129 	if err := json.NewDecoder(resp.Body).Decode(rr); err != nil {
       
   130 		return nil, err
       
   131 	}
       
   132 	return rr, nil
       
   133 }
       
   134 
       
   135 func dataOutput(headers map[string]string,
       
   136 	token, id, data string) <-chan string {
       
   137 	ch := make(chan string)
       
   138 	go func() {
       
   139 		resp, err := rspamdPost(headers, data)
       
   140 		if err != nil {
       
   141 			log.Fatal(err)
       
   142 		}
       
   143 		log.Printf("%v\n", resp)
       
   144 		m, err := mail.ReadMessage(strings.NewReader(data))
       
   145 		if err != nil {
       
   146 			log.Fatal(err)
       
   147 		}
       
   148 		rejectReason := ""
       
   149 		switch resp.Action {
       
   150 		case "add header":
       
   151 			m.Header["X-Spam"] = []string{"yes"}
       
   152 			m.Header["X-Spam-Score"] = []string{
       
   153 				fmt.Sprintf("%v / %v",
       
   154 					resp.Score, resp.RequiredScore)}
       
   155 		case "rewrite subject":
       
   156 			m.Header["Subject"] = []string{resp.Subject}
       
   157 		case "reject":
       
   158 			rejectReason = "550 message rejected"
       
   159 		case "greylist":
       
   160 			rejectReason = "421 greylisted"
       
   161 		case "soft reject":
       
   162 			rejectReason = "451 try again later"
       
   163 		}
       
   164 		// Write DKIM-Signature header first if present
       
   165 		if resp.DKIMSig != "" {
       
   166 			stdout.Printf("filter-dataline|%s|%s|%s: %s\n",
       
   167 				token, id, "DKIM-Signature", resp.DKIMSig)
       
   168 		}
       
   169 		// preserve order?
       
   170 		for k, v := range m.Header {
       
   171 			stdout.Printf("filter-dataline|%s|%s|%s: %s\n",
       
   172 				token, id, k, strings.Join(v, ","))
       
   173 		}
       
   174 		// Blank line seperates headers and body
       
   175 		stdout.Printf("filter-dataline|%s|%s|\n", token, id)
       
   176 		s := bufio.NewScanner(m.Body)
       
   177 		for s.Scan() {
       
   178 			stdout.Printf("filter-dataline|%s|%s|%s\n",
       
   179 				token, id, s.Text())
       
   180 		}
       
   181 		stdout.Printf("filter-dataline|%s|%s|%s\n", token, id, ".")
       
   182 		ch <- rejectReason
       
   183 	}()
       
   184 	return ch
       
   185 }
       
   186 
       
   187 func main() {
       
   188 	log.SetFlags(0)
       
   189 	log.SetPrefix("filter_rspamd: ")
       
   190 	stdout = log.New(os.Stdout, "", 0)
       
   191 	registry := map[string]struct {
       
   192 		kind string
       
   193 		fn   func(*session, []string)
       
   194 	}{
       
   195 		"link-connect":    {"report", linkConnect},
       
   196 		"link-disconnect": {"report", nil},
       
   197 		"link-identify":   {"report", linkIdentify},
       
   198 		"tx-begin":        {"report", txBegin},
       
   199 		"tx-data":         {"report", txData},
       
   200 		"tx-mail":         {"report", txMail},
       
   201 		"tx-rcpt":         {"report", txRcpt},
       
   202 		"tx-commit":       {"report", txCleanup},
       
   203 		"tx-rollback":     {"report", txCleanup},
       
   204 		"commit":          {"filter", filterCommit},
       
   205 		"data-line":       {"filter", filterDataLine},
       
   206 	}
       
   207 	for k, v := range registry {
       
   208 		fmt.Printf("register|%s|smtp-in|%s\n", v.kind, k)
       
   209 	}
       
   210 	fmt.Println("register|ready")
       
   211 	sessions := map[string]*session{}
       
   212 	var event, id string
       
   213 	stdin := bufio.NewScanner(os.Stdin)
       
   214 	for stdin.Scan() {
       
   215 		fields := strings.Split(stdin.Text(), "|")
       
   216 		switch fields[0] {
       
   217 		case "report":
       
   218 			id = fields[5]
       
   219 		case "filter":
       
   220 			id = fields[6]
       
   221 		default:
       
   222 			log.Fatalf("Unknown kind: %s", fields[0])
       
   223 		}
       
   224 		event = fields[4]
       
   225 		switch event {
       
   226 		case "link-disconnect":
       
   227 			delete(sessions, id)
       
   228 		case "link-connect":
       
   229 			sessions[id] = &session{
       
   230 				control: map[string]string{},
       
   231 				id:      id,
       
   232 				payload: &strings.Builder{}}
       
   233 			fallthrough
       
   234 		default:
       
   235 			registry[event].fn(sessions[id], fields)
       
   236 		}
       
   237 	}
       
   238 }