Täna kasvavad ja kogunevad andmed senisest kiiremini. Praegu on umbes 90% meie maailmas loodud andmetest loodud viimase kahe aasta jooksul. Selle määra kasvu tõttu on platvormid Suured andmed nii suure andmemahu säilitamiseks pidid nad kasutama radikaalseid lahendusi.
vedru mvc koos vedruga
Üks olulisemaid andmeallikaid on tänapäeval sotsiaalmeedia. Lubage mul näidata näide reaalsest elust: sotsiaalmeedia andmete reaalajas teabe haldamine, analüüsimine ja väljavõtmine, kasutades üht ökolahendust Suured andmed kõige olulisemad asjad seal - Apache Spark ja Python.
Selles artiklis näitan teile, kuidas luua lihtne rakendus, mis loeb Pythoni abil Twitteri veebikanaleid, seejärel töötleb säutse Apache Sparki voogesitus tuvastada räsimärgid ja lõpuks tagastada kõige olulisemad trendirõhulised räsimärgid ja renderdada need andmed armatuurlaual reaalajas.
Twitterist säutsude saamiseks peate registreeruma aadressil TwitterApps Klõpsates nuppu 'Loo uus rakendus' ja pärast alloleva vormi täitmist klõpsake nuppu 'Loo oma Twitteri rakendus'.
Teiseks minge oma hiljuti loodud rakendusse ja avage aken 'Juurdepääsu tunnused ja võtmed'. Seejärel klõpsake nuppu 'Loo minu juurdepääsutunnus'.
Teie uued sisselogimis-ID-d kuvatakse nagu allpool näidatud.
Ja nüüd olete valmis järgmiseks sammuks.
Selles etapis näitan teile, kuidas koostada lihtne klient, kes tõmbab Pythoni abil Twitteri API-st tweetid ja edastab need seejärel eksemplarile Säde voogesitus . Seda peaks olema lihtne järgida pythoni arendaja professionaalne.
Kõigepealt loome faili nimega twitter_app.py
ja siis lisame koodi kokku, nagu allpool näha.
kuidas arvutada kõne hind
Importige teegid, mida hakkame kasutama, nagu allpool näha:
import socket import sys import requests import requests_oauthlib import json
Ja lisage muutujad, mida OAuth kasutab Twitteriga ühenduse loomiseks, nagu allpool näha:
# Reemplaza los valores de abajo con los tuyos ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
Nüüd loome uue funktsiooni nimega get_tweets
mis kutsub Twitteri API URL-i ja tagastab vastuse tweetide stringile.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
Seejärel loote funktsiooni, mis võtab vastuse ülaltoodud vaatest ja eraldab tweetide teksti täielike säutsude JSON-objektist. Pärast seda saatke iga säuts eksemplarile Säde voogesitus (arutatakse hiljem) TCP-ühenduse kaudu.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + '
') except: e = sys.exc_info()[0] print('Error: %s' % e)
Nüüd teeme põhiosa. See muudab rakenduse hostiks ühendused pistikupesa , millega see hiljem ühendub Säde . Konfigureerime siin IP-i väärtuseks localhost
kuna kõik töötab samal masinal ja sadamas 9009
. Seejärel helistame meetodile get_tweets
, mida me tegime ülal, et saada Twitteri säutse ja edastada teie vastus ühendusega pistikupesa a send_tweets_to_spark
säutsud Sparki saata.
TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)
Ehitame oma rakenduse Säde voogesitus , mis töötleb sissetulevate säutsude jaoks reaalajas, eraldab neist räsimärgid ja arvutab, kui palju räsimärke on mainitud.
Esiteks peame looma eksemplari Sädeme kontekst sc
, siis loome Voogesituse kontekst ssc
/ _ + _ | kahesekundilise intervalliga, mis teostab teisenduse kõigis iga kahe sekundi tagant vastuvõetud ülekannetes. Pange tähele, et logi tasemeks määrati sc
enamiku kirjutatavate logide keelamiseks Säde .
Siin määratleme kontrollpunkti, et võimaldada perioodilist RDD kontrolli; seda on meie rakenduses kohustuslik kasutada, kuna kasutame olekulisi tulekahjude teisendusi (arutleme hiljem samas osas).
Seejärel määratleme oma peamise DStreami dataStreami, mis ühendab serveri pistikupesa mille lõime sadamas varem ERROR
ja see loeb selle sadama säutse. Iga DStreami kirje on säuts.
9009
Nüüd määratleme oma teisendusloogika. Kõigepealt jagame kõik säutsud sõnadesse ja paneme need RDD sõnadesse. Seejärel filtreerime kõigist sõnadest ainult räsimärgid ja joonistame need from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # crea una configuración spark conf = SparkConf() conf.setAppName('TwitterStreamApp') # crea un contexto spark con la configuración anterior sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos ssc = StreamingContext(sc, 2) # establece un punto de control para permitir la recuperación de RDD ssc.checkpoint('checkpoint_TwitterApp') # lee data del puerto 9009 dataStream = ssc.socketTextStream('localhost',9009)
juurde ja panime need RDD hashtagidesse.
edukustasud kapitali kaasamise eest
Siis peame arvutama, mitu korda on räsimärki mainitud. Saame seda teha funktsiooni (hashtag, 1)
abil. See funktsioon arvutab, mitu korda on iga rühm maininud hashtagi, see tähendab, et see lähtestab igas grupis konto.
Meie puhul peame arvutama arvud kõigis rühmades, nii et kasutame veel ühte funktsiooni nimega reduceByKey
kuna see funktsioon võimaldab teil säilitada RDD olekut, värskendades seda uute andmetega. Seda vormi nimetatakse updateStateByKey
.
Pange tähele, et Stateful Transformation
kasutamiseks peate konfigureerima kontrollpunkti ja eelmises etapis tehtud toimingud.
updateStateByKey
# divide cada Tweet en palabras words = dataStream.flatMap(lambda line: line.split(' ')) # filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # agrega la cuenta de cada hashtag a su última cuenta tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # procesa cada RDD generado en cada intervalo tags_totals.foreachRDD(process_rdd) # comienza la computación de streaming ssc.start() # espera que la transmisión termine ssc.awaitTermination()
võtab funktsiooni parameetrina, mida nimetatakse funktsiooniks updateStateByKey
See käivitatakse RDD igale üksusele ja täidab soovitud loogikat.
Meie puhul oleme loonud värskendusfunktsiooni nimega update
mis liidab kõik räsimärgid aggregate_tags_count
(uued väärtused) ja lisab need new_values
(kogusumma), mis on kõigi rühmade summa ja salvestab andmed RDD-sse total_sum
tags_totals
Siis tegeleme RDD töötlemisega def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
igas rühmas, et oleks võimalik selle abil ajutiseks tabeliks teisendada Säde SQL-kontekst ja pärast seda tehke avaldus, et saaksite kümme parimat räsimärki oma kontodega võtta ja panna need andmete tags_totals
raami.
hashtag_counts_df
Meie Sparki rakenduse viimane samm on andmeraami def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # obtén el contexto spark sql singleton desde el contexto actual sql_context = get_sql_context_instance(rdd.context) # convierte el RDD a Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # crea un DF desde el Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Registra el marco de data como tabla hashtags_df.registerTempTable('hashtags') # obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # llama a este método para preparar los 10 mejores hashtags DF y envíalos send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)
saatmine armatuurlaua rakendusse. Seega teisendame andmeraami kaheks maatriksiks, üheks räsimärkide ja teise nende kontode jaoks. Seejärel liigume armatuurlaua rakendusse REST API kaudu.
hashtag_counts_df
Lõpuks on siin valim väljundist Säde voogesitus jooksu ajal ja printides def send_df_to_dashboard(df): # extrae los hashtags del marco de data y conviértelos en una matriz top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extrae las cuentas del marco de data y conviértelos en una matriz tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # inicia y envía la data a través de la API REST url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
. Märkate, et väljund prinditakse täpselt iga kahe sekundi tagant iga grupi intervalli kohta.
Nüüd loome lihtsa armatuurlaua rakenduse, mida Spark reaalajas värskendab. Ehitame selle Pythoni, Flaski ja Charts.js .
Kõigepealt loome Pythoni projekti allpool toodud struktuuriga, laadime alla ja lisame faili Chart.js staatilises kataloogis.
Seejärel loome failis hashtag_counts_df
funktsiooni nimega app.py
, mille Spark kutsub URL-i kaudu update_data
globaalsete siltide ja väärtusmassiivide värskendamiseks.
Samamoodi on funktsioon http://localhost:5001/updateData
see on loodud kutsuma AJAX-i taotlusega tagastada uued värskendatud sildid ja väärtusmassiivid kui JSON. Funktsioon refresh_graph_data
lahkub lehelt get_chart_page
kui kutsutakse.
chart.html
Nüüd loome faili from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)
lihtsa graafiku et oleks võimalik hashtagi andmeid näidata ja neid reaalajas värskendada. Nagu allpool määratletud, peame importima JavaScripti teegid chart.html
ja Chart.js
.
Sildi põhiosas peame looma lõuendi ja andma sellele id, et oleks võimalik graafiku kuvamise ajal sellele viidata, kui kasutate JavaScripti järgmises etapis.
jquery.min.js
Nüüd loome graafiku, kasutades allpool olevat JavaScripti koodi. Kõigepealt võtame lõuendi elemendi ja seejärel loome uue graafi objekti, edastame sellele lõuendi elemendi ja määratleme andmeobjekti, nagu allpool näha.
Pange tähele, et andmesildid on ühendatud siltide ja väärtusemuutujatega, mis tagastatakse lehelt lahkumisel, helistades Top Trending Twitter Hashtags
failis Top Trending Twitter Hashtags
get_chart_page
.
kuidas saada krediitkaardiandmeid
Viimane osa on funktsioon, mis on konfigureeritud tegema Ajaxi päringu iga sekundi järel ja kutsuma URL app.py
, mis käivitab /refreshData
aastal refresh_graph_data
ja see tagastab uued värskendatud andmed ning värskendab seejärel graafikut, mille uued andmed jätavad.
app.py
Käivitame kolm rakendust allpool toodud järjekorras: 1. Twitter App Client. 2. Säde rakendus. 3. Armatuurlaua veebirakendus.
Seejärel pääsete URL-i otsides juhtpaneelile reaalajas juurde
Nüüd näete oma graafiku värskendamist allpool:
Oleme õppinud tegema lihtsat andmete analüüsi reaalajas, kasutades Spark Streamingut, ja integreerima need otse lihtsa juhtpaneeliga, kasutades RESTful veebiteenust. Selle näite põhjal näeme, kui võimas on Spark, kuna see hõivab tohutu andmevoo, muudab seda ja eraldab väärtuslikku teavet, mida saab hõlpsalt kasutada lühikese aja jooksul otsuste langetamiseks. On palju kasulikke kasutusjuhtumeid, mida on võimalik rakendada ja mis võivad teenida erinevaid tööstusharusid, näiteks uudised või turundus.
Uudistetööstuse näide
Saame jälgida kõige sagedamini mainitud räsimärke, et teada saada, millistel teemadel inimesed sotsiaalmeedias räägivad. Samuti saame jälgida konkreetseid räsimärke ja nende säutse, et teada saada, mida inimesed konkreetsete teemade või sündmuste kohta maailmas räägivad.
Turunduse näide
Saame koguda säutsu edastamist ja arvamusanalüüsi tehes kategoriseerida need ja määratleda inimeste huvid, et tuua neile nende huvidega seotud pakkumisi.
küberjulgeoleku probleemid ja lahendused
Samuti on palju kasutusjuhtumeid, mida saab rakendada spetsiaalselt analüütika jaoks. Suured andmed ja need võivad teenida paljusid tööstusharusid. Üldiste Apache Sparki kasutamise juhtumite jaoks soovitan teil vaadata ühte meie eelmised postitused .
Soovitan teil lugeda lähemalt Säde voogesitus siin selle võimaluste kohta lisateabe saamiseks ja andmete kasutamisel reaalajas lisateabe saamiseks täpsemate andmete teisendamiseks.