From d22a61311a7d19667e781e6d8026e24395ef3087 Mon Sep 17 00:00:00 2001 From: Matthias Andreas Benkard Date: Tue, 10 Apr 2012 17:35:38 +0200 Subject: Lafargue: Add XMPP notification capability. --- src/mulk/benki/lazychat.clj | 40 ++++++++++++++++++++--- src/mulk/benki/main.clj | 7 +++-- src/mulk/benki/xmpp.clj | 77 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 7 deletions(-) create mode 100644 src/mulk/benki/xmpp.clj (limited to 'src') diff --git a/src/mulk/benki/lazychat.clj b/src/mulk/benki/lazychat.clj index e2caaf8..ecc4ac1 100644 --- a/src/mulk/benki/lazychat.clj +++ b/src/mulk/benki/lazychat.clj @@ -21,7 +21,8 @@ [lamina.core :as lamina] [aleph.http :as ahttp] [aleph.formats :as aformats] - [clojure.data.json :as json]) + [clojure.data.json :as json] + [mulk.benki.xmpp :as xmpp]) (:import [org.apache.abdera Abdera])) @@ -29,6 +30,29 @@ (defonce lafargue-events (channel)) +(defmethod xmpp/format-message ::lafargue-message + [message] + (fmt nil "<~A>\n\n~A" (:first_name message) (:content message))) + +(defn determine-targets [message] + (letfn [(protected-targets [] + (with-dbt + (map :id (query "SELECT id FROM users WHERE status IN ('admin', 'approved')"))))] + (into #{} + (concat (:targets message) + (case (keyword (:visibility message)) + :personal nil + :protected (protected-targets) + :public (cons nil (protected-targets))))))) + +(defn push-message-to-xmpp [msg] + (let [targets (filter integer? (determine-targets msg))] + (enqueue xmpp/messages {:message msg, + :targets targets}))) + +(defn start-xmpp-pump [] + (receive-all lafargue-events push-message-to-xmpp)) + (defn create-lazychat-message! [{content :content, visibility :visibility format :format, targets :targets, referees :referees, id :id}] @@ -53,10 +77,12 @@ [:message :target] [id (int target)])) (enqueue lafargue-events - {:content content, :visibility visibility, - :format format, :targets targets, - :referees referees, :id id, - :author *user*, :date (java.util.Date.)}))))) + (with-meta + {:content content, :visibility visibility, + :format format, :targets targets, + :referees referees, :id id, + :author *user*, :date (java.util.Date.)} + {:type ::lafargue-message})))))) (defn select-message [id] (let [message (query1 "SELECT author, content, format, visibility, date @@ -231,3 +257,7 @@ (with-auth (response/json (with-dbt (query1 "SELECT NEXTVAL('lazychat_messages_id_seq')"))))) + +(defn init-lazychat! [] + (future + (receive-all lafargue-events push-message-to-xmpp))) diff --git a/src/mulk/benki/main.clj b/src/mulk/benki/main.clj index 8a7f094..031a500 100644 --- a/src/mulk/benki/main.clj +++ b/src/mulk/benki/main.clj @@ -5,7 +5,7 @@ [hiccup core page-helpers] [mulk.benki util config db]) (:require [noir server options] - [mulk.benki wiki auth book_marx id lazychat] + [mulk.benki wiki auth book_marx id lazychat xmpp] [ring.middleware.file] [noir.session :as session] [noir.request :as request] @@ -92,7 +92,10 @@ :websocket true}))) (defonce server - (future (run-server))) + (do + (future (mulk.benki.xmpp/init-xmpp!)) + (future (mulk.benki.lazychat/init-lazychat!)) + (future (run-server)))) (defn -main [& args] (loop [] diff --git a/src/mulk/benki/xmpp.clj b/src/mulk/benki/xmpp.clj new file mode 100644 index 0000000..4f967ba --- /dev/null +++ b/src/mulk/benki/xmpp.clj @@ -0,0 +1,77 @@ +(ns mulk.benki.xmpp + (:refer-clojure) + (:use [clojure repl] + [noir core] + [noir-async core] + [mulk.benki auth config db util webutil] + ;; + [clojure.core.match :only [match]] + [lamina.core :only [channel enqueue enqueue-and-close receive-all + map* filter*]]) + (:require [clojure.algo.monads :as m] + [clojure.java.jdbc :as sql] + [clojure.string :as string] + [lamina.core :as lamina] + [clojure.data.json :as json]) + (:import [org.jivesoftware.smack ConnectionConfiguration + ConnectionConfiguration$SecurityMode + XMPPConnection + MessageListener])) + + +(defonce xmpp (atom nil)) +(defonce messages (channel)) + + +(defn- connect [] + (let [xmpp-config (:xmpp @benki-config) + connection-config (doto (ConnectionConfiguration. (:server xmpp-config) + (:port xmpp-config) + (:service-name xmpp-config)) + (.setSecurityMode (case (:tls xmpp-config) + true ConnectionConfiguration$SecurityMode/enabled + false ConnectionConfiguration$SecurityMode/disabled + :required ConnectionConfiguration$SecurityMode/required)) + (.setVerifyRootCAEnabled true) + (.setVerifyChainEnabled true) + ;;(.setCompressionEnabled true) + (.setSASLAuthenticationEnabled true))] + (doto (XMPPConnection. connection-config) + (.connect) + (.login (:user xmpp-config) (:password xmpp-config) (:resource xmpp-config))))) + +(defn reconnect! [] + (swap! xmpp #(do (when % + (.disconnect %)) + (connect)))) + +(defmulti format-message type) + +(defn- ->pgarray [coll] + (fmt nil "{~{~A~^,~}}" coll)) + +(defn- push-message [targets message] + (let [chat-manager (.getChatManager @xmpp) + notification (format-message message) + recipients (with-dbt + (map :jid + (query "SELECT jid FROM user_jids j INNER JOIN unnest(?::INTEGER[]) t ON j.user = t" + (->pgarray targets))))] + (doseq [recipient recipients] + (future + (let [chat (.createChat chat-manager + recipient + (reify MessageListener + (processMessage [self chat message] + nil)))] + (.sendMessage chat notification)))))) + +(defn- startup-client [] + (receive-all messages + (fn [{targets :targets, msg :message}] + (push-message targets msg)))) + +(defn init-xmpp! [] + (future + (reconnect!) + (startup-client))) -- cgit v1.2.3