blob: c23f61379f0bd70e0a36e8a459fbce58a915102c (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
(ns mulk.benki.xmpp
(:refer-clojure)
(:use [clojure repl]
[noir core]
[noir-async core]
[mulk.benki auth config db util webutil]
;;
[clojure.core.match :only [match]]
[lamina.core :only [channel enqueue enqueue-and-close receive-all
map* filter*]])
(:require [clojure.algo.monads :as m]
[clojure.java.jdbc :as sql]
[clojure.string :as string]
[lamina.core :as lamina]
[clojure.data.json :as json])
(:import [org.jivesoftware.smack ConnectionConfiguration
ConnectionConfiguration$SecurityMode
XMPPConnection
MessageListener
ChatManagerListener]))
(defonce xmpp (atom nil))
(defonce messages (channel))
(defonce messages-in (channel))
(defn- connect []
(let [xmpp-config (:xmpp @benki-config)
connection-config (doto (ConnectionConfiguration. (:server xmpp-config)
(:port xmpp-config)
(:service-name xmpp-config))
(.setSecurityMode (case (:tls xmpp-config)
true ConnectionConfiguration$SecurityMode/enabled
false ConnectionConfiguration$SecurityMode/disabled
:required ConnectionConfiguration$SecurityMode/required))
(.setVerifyRootCAEnabled (:verify-cert xmpp-config))
(.setVerifyChainEnabled (:verify-cert xmpp-config))
;;(.setCompressionEnabled true)
(.setSASLAuthenticationEnabled (:sasl xmpp-config)))]
(doto (XMPPConnection. connection-config)
(.connect)
(.login (:user xmpp-config) (:password xmpp-config) (:resource xmpp-config)))))
(defn reconnect! []
(swap! xmpp #(do (when %
(.disconnect %))
(connect))))
(defmulti format-message type)
(defn- ->pgarray [coll]
(fmt nil "{~{~A~^,~}}" coll))
(defn- push-message [targets message]
(let [chat-manager (.getChatManager @xmpp)
notification (format-message message)
recipients (with-dbt
(map :jid
(query "SELECT jid FROM user_jids j INNER JOIN unnest(?::INTEGER[]) t ON j.user = t"
(->pgarray targets))))]
(doseq [recipient recipients]
(future
(let [chat (.createChat chat-manager
recipient
(reify MessageListener
(processMessage [self chat message]
(when-let [body (.getBody message)]
(when-not (re-find #"^\?OTR:" body)
(enqueue messages-in {:sender recipient, :body body}))))))]
(.sendMessage chat notification))))))
(defn- startup-client []
(receive-all messages
(fn [{targets :targets, msg :message}]
(push-message targets msg))))
(defn- handle-incoming-chats []
(doto (.getChatManager @xmpp)
(.addChatListener
(reify ChatManagerListener
(chatCreated [self chat local?]
(when-not local?
(.addMessageListener
chat
(reify MessageListener
(processMessage [self chat message]
(when-let [body (.getBody message)]
(when-not (re-find #"^\?OTR:" body)
(enqueue messages-in {:sender (.getParticipant chat) :body body}))))))))))))
(defn init-xmpp! []
(future
(reconnect!)
(handle-incoming-chats)
(startup-client)))
|