apeescape2.com
  • Põhiline
  • Muu
  • Brändikujundus
  • Trendid
  • Puldi Tõus
Andmeteadus Ja Andmebaasid

Apache Sparki voogesituse õpetus: Twitteri populaarsete hashtagide tuvastamine

Täna kasvavad ja kogunevad andmed senisest kiiremini. Praegu on umbes 90% meie maailmas loodud andmetest loodud viimase kahe aasta jooksul. Selle määra kasvu tõttu on platvormid Suured andmed nii suure andmemahu säilitamiseks pidid nad kasutama radikaalseid lahendusi.

vedru mvc koos vedruga

Üks olulisemaid andmeallikaid on tänapäeval sotsiaalmeedia. Lubage mul näidata näide reaalsest elust: sotsiaalmeedia andmete reaalajas teabe haldamine, analüüsimine ja väljavõtmine, kasutades üht ökolahendust Suured andmed kõige olulisemad asjad seal - Apache Spark ja Python.



Apache Sparki voogedastust saab kasutada sotsiaalmeediast teabe hankimiseks, näiteks trendid Twitteri hashtagid



Selles artiklis näitan teile, kuidas luua lihtne rakendus, mis loeb Pythoni abil Twitteri veebikanaleid, seejärel töötleb säutse Apache Sparki voogesitus tuvastada räsimärgid ja lõpuks tagastada kõige olulisemad trendirõhulised räsimärgid ja renderdada need andmed armatuurlaual reaalajas.

Twitteri API-de jaoks oma volituste loomine

Twitterist säutsude saamiseks peate registreeruma aadressil TwitterApps Klõpsates nuppu 'Loo uus rakendus' ja pärast alloleva vormi täitmist klõpsake nuppu 'Loo oma Twitteri rakendus'.



Ekraanipilt: kuidas oma Twitteri rakendust luua

Teiseks minge oma hiljuti loodud rakendusse ja avage aken 'Juurdepääsu tunnused ja võtmed'. Seejärel klõpsake nuppu 'Loo minu juurdepääsutunnus'.

Ekraanipilt: Twitteri rakenduse mandaatide, paroolide ja juurdepääsuidentifikaatorite installimine



Teie uued sisselogimis-ID-d kuvatakse nagu allpool näidatud.

Ekraanipilt: rakenduse Twiiter juurdepääsutunnuste installimine

Ja nüüd olete valmis järgmiseks sammuks.

Ehitage HTTP Twitteri klient

Selles etapis näitan teile, kuidas koostada lihtne klient, kes tõmbab Pythoni abil Twitteri API-st tweetid ja edastab need seejärel eksemplarile Säde voogesitus . Seda peaks olema lihtne järgida pythoni arendaja professionaalne.

Kõigepealt loome faili nimega twitter_app.py ja siis lisame koodi kokku, nagu allpool näha.

kuidas arvutada kõne hind

Importige teegid, mida hakkame kasutama, nagu allpool näha:

import socket import sys import requests import requests_oauthlib import json

Ja lisage muutujad, mida OAuth kasutab Twitteriga ühenduse loomiseks, nagu allpool näha:

# Reemplaza los valores de abajo con los tuyos ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

Nüüd loome uue funktsiooni nimega get_tweets mis kutsub Twitteri API URL-i ja tagastab vastuse tweetide stringile.

def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

Seejärel loote funktsiooni, mis võtab vastuse ülaltoodud vaatest ja eraldab tweetide teksti täielike säutsude JSON-objektist. Pärast seda saatke iga säuts eksemplarile Säde voogesitus (arutatakse hiljem) TCP-ühenduse kaudu.

def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + ' ') except: e = sys.exc_info()[0] print('Error: %s' % e)

Nüüd teeme põhiosa. See muudab rakenduse hostiks ühendused pistikupesa , millega see hiljem ühendub Säde . Konfigureerime siin IP-i väärtuseks localhost kuna kõik töötab samal masinal ja sadamas 9009. Seejärel helistame meetodile get_tweets, mida me tegime ülal, et saada Twitteri säutse ja edastada teie vastus ühendusega pistikupesa a send_tweets_to_spark säutsud Sparki saata.

TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)

Apache Sparki voogesitusrakenduse installimine

Ehitame oma rakenduse Säde voogesitus , mis töötleb sissetulevate säutsude jaoks reaalajas, eraldab neist räsimärgid ja arvutab, kui palju räsimärke on mainitud.

Illustratsioon: * Säde voogesitus * võimaldab sissetulevate säutsude töötlemist reaalajas ja hashtagi ekstraktimist

Esiteks peame looma eksemplari Sädeme kontekst sc, siis loome Voogesituse kontekst ssc / _ + _ | kahesekundilise intervalliga, mis teostab teisenduse kõigis iga kahe sekundi tagant vastuvõetud ülekannetes. Pange tähele, et logi tasemeks määrati sc enamiku kirjutatavate logide keelamiseks Säde .

Siin määratleme kontrollpunkti, et võimaldada perioodilist RDD kontrolli; seda on meie rakenduses kohustuslik kasutada, kuna kasutame olekulisi tulekahjude teisendusi (arutleme hiljem samas osas).

Seejärel määratleme oma peamise DStreami dataStreami, mis ühendab serveri pistikupesa mille lõime sadamas varem ERROR ja see loeb selle sadama säutse. Iga DStreami kirje on säuts.

9009

Nüüd määratleme oma teisendusloogika. Kõigepealt jagame kõik säutsud sõnadesse ja paneme need RDD sõnadesse. Seejärel filtreerime kõigist sõnadest ainult räsimärgid ja joonistame need from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # crea una configuración spark conf = SparkConf() conf.setAppName('TwitterStreamApp') # crea un contexto spark con la configuración anterior sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos ssc = StreamingContext(sc, 2) # establece un punto de control para permitir la recuperación de RDD ssc.checkpoint('checkpoint_TwitterApp') # lee data del puerto 9009 dataStream = ssc.socketTextStream('localhost',9009) juurde ja panime need RDD hashtagidesse.

edukustasud kapitali kaasamise eest

Siis peame arvutama, mitu korda on räsimärki mainitud. Saame seda teha funktsiooni (hashtag, 1) abil. See funktsioon arvutab, mitu korda on iga rühm maininud hashtagi, see tähendab, et see lähtestab igas grupis konto.

Meie puhul peame arvutama arvud kõigis rühmades, nii et kasutame veel ühte funktsiooni nimega reduceByKey kuna see funktsioon võimaldab teil säilitada RDD olekut, värskendades seda uute andmetega. Seda vormi nimetatakse updateStateByKey.

Pange tähele, et Stateful Transformation kasutamiseks peate konfigureerima kontrollpunkti ja eelmises etapis tehtud toimingud.

updateStateByKey

# divide cada Tweet en palabras words = dataStream.flatMap(lambda line: line.split(' ')) # filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # agrega la cuenta de cada hashtag a su última cuenta tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # procesa cada RDD generado en cada intervalo tags_totals.foreachRDD(process_rdd) # comienza la computación de streaming ssc.start() # espera que la transmisión termine ssc.awaitTermination() võtab funktsiooni parameetrina, mida nimetatakse funktsiooniks updateStateByKey See käivitatakse RDD igale üksusele ja täidab soovitud loogikat.

Meie puhul oleme loonud värskendusfunktsiooni nimega update mis liidab kõik räsimärgid aggregate_tags_count (uued väärtused) ja lisab need new_values (kogusumma), mis on kõigi rühmade summa ja salvestab andmed RDD-sse total_sum

tags_totals

Siis tegeleme RDD töötlemisega def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0) igas rühmas, et oleks võimalik selle abil ajutiseks tabeliks teisendada Säde SQL-kontekst ja pärast seda tehke avaldus, et saaksite kümme parimat räsimärki oma kontodega võtta ja panna need andmete tags_totals raami.

hashtag_counts_df

Meie Sparki rakenduse viimane samm on andmeraami def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # obtén el contexto spark sql singleton desde el contexto actual sql_context = get_sql_context_instance(rdd.context) # convierte el RDD a Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # crea un DF desde el Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Registra el marco de data como tabla hashtags_df.registerTempTable('hashtags') # obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # llama a este método para preparar los 10 mejores hashtags DF y envíalos send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e) saatmine armatuurlaua rakendusse. Seega teisendame andmeraami kaheks maatriksiks, üheks räsimärkide ja teise nende kontode jaoks. Seejärel liigume armatuurlaua rakendusse REST API kaudu.

hashtag_counts_df

Lõpuks on siin valim väljundist Säde voogesitus jooksu ajal ja printides def send_df_to_dashboard(df): # extrae los hashtags del marco de data y conviértelos en una matriz top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extrae las cuentas del marco de data y conviértelos en una matriz tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # inicia y envía la data a través de la API REST url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data) . Märkate, et väljund prinditakse täpselt iga kahe sekundi tagant iga grupi intervalli kohta.

Näide Twitteri * Säde voogesituse * väljundist, mis on prinditud iga grupi intervalli sätte jaoks

Andmete esitamiseks looge lihtne reaalajas juhtpaneel

Nüüd loome lihtsa armatuurlaua rakenduse, mida Spark reaalajas värskendab. Ehitame selle Pythoni, Flaski ja Charts.js .

Kõigepealt loome Pythoni projekti allpool toodud struktuuriga, laadime alla ja lisame faili Chart.js staatilises kataloogis.

Illustratsioon: looge Pythoni projekt kasutamiseks Twitteri hashtagi analüüsis

Seejärel loome failis hashtag_counts_df funktsiooni nimega app.py, mille Spark kutsub URL-i kaudu update_data globaalsete siltide ja väärtusmassiivide värskendamiseks.

Samamoodi on funktsioon http://localhost:5001/updateData see on loodud kutsuma AJAX-i taotlusega tagastada uued värskendatud sildid ja väärtusmassiivid kui JSON. Funktsioon refresh_graph_data lahkub lehelt get_chart_page kui kutsutakse.

chart.html

Nüüd loome faili from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001) lihtsa graafiku et oleks võimalik hashtagi andmeid näidata ja neid reaalajas värskendada. Nagu allpool määratletud, peame importima JavaScripti teegid chart.html ja Chart.js.

Sildi põhiosas peame looma lõuendi ja andma sellele id, et oleks võimalik graafiku kuvamise ajal sellele viidata, kui kasutate JavaScripti järgmises etapis.

jquery.min.js

Nüüd loome graafiku, kasutades allpool olevat JavaScripti koodi. Kõigepealt võtame lõuendi elemendi ja seejärel loome uue graafi objekti, edastame sellele lõuendi elemendi ja määratleme andmeobjekti, nagu allpool näha.

Pange tähele, et andmesildid on ühendatud siltide ja väärtusemuutujatega, mis tagastatakse lehelt lahkumisel, helistades Top Trending Twitter Hashtags

Top Trending Twitter Hashtags

failis get_chart_page.

kuidas saada krediitkaardiandmeid

Viimane osa on funktsioon, mis on konfigureeritud tegema Ajaxi päringu iga sekundi järel ja kutsuma URL app.py, mis käivitab /refreshData aastal refresh_graph_data ja see tagastab uued värskendatud andmed ning värskendab seejärel graafikut, mille uued andmed jätavad.

app.py

Käivitage rakendused koos

Käivitame kolm rakendust allpool toodud järjekorras: 1. Twitter App Client. 2. Säde rakendus. 3. Armatuurlaua veebirakendus.

Seejärel pääsete URL-i otsides juhtpaneelile reaalajas juurde

Nüüd näete oma graafiku värskendamist allpool:

Animatsioon: graafik Twitteris populaarsete räsimärkide kohta reaalajas

Apache'i voogesituse tegelik elu

Oleme õppinud tegema lihtsat andmete analüüsi reaalajas, kasutades Spark Streamingut, ja integreerima need otse lihtsa juhtpaneeliga, kasutades RESTful veebiteenust. Selle näite põhjal näeme, kui võimas on Spark, kuna see hõivab tohutu andmevoo, muudab seda ja eraldab väärtuslikku teavet, mida saab hõlpsalt kasutada lühikese aja jooksul otsuste langetamiseks. On palju kasulikke kasutusjuhtumeid, mida on võimalik rakendada ja mis võivad teenida erinevaid tööstusharusid, näiteks uudised või turundus.

Illustratsioon: räsimärke saab kasutada teabe ja väärtuse väljavõtmiseks, mida saab rakendada mitmel tööstusharul.

Uudistetööstuse näide

Saame jälgida kõige sagedamini mainitud räsimärke, et teada saada, millistel teemadel inimesed sotsiaalmeedias räägivad. Samuti saame jälgida konkreetseid räsimärke ja nende säutse, et teada saada, mida inimesed konkreetsete teemade või sündmuste kohta maailmas räägivad.

Turunduse näide

Saame koguda säutsu edastamist ja arvamusanalüüsi tehes kategoriseerida need ja määratleda inimeste huvid, et tuua neile nende huvidega seotud pakkumisi.

küberjulgeoleku probleemid ja lahendused

Samuti on palju kasutusjuhtumeid, mida saab rakendada spetsiaalselt analüütika jaoks. Suured andmed ja need võivad teenida paljusid tööstusharusid. Üldiste Apache Sparki kasutamise juhtumite jaoks soovitan teil vaadata ühte meie eelmised postitused .

Soovitan teil lugeda lähemalt Säde voogesitus siin selle võimaluste kohta lisateabe saamiseks ja andmete kasutamisel reaalajas lisateabe saamiseks täpsemate andmete teisendamiseks.

Armatuurlaua kujundus - kaalutlused ja parimad tavad

Ux Disain

Armatuurlaua kujundus - kaalutlused ja parimad tavad
Kuidas korraldada edukat tehnilist konverentsi: CordobaJS-i sündmus

Kuidas korraldada edukat tehnilist konverentsi: CordobaJS-i sündmus

Elustiil

Lemmik Postitused
MIDI õpetus: brauseripõhiste helirakenduste loomine, mida kontrollib MIDI riistvara
MIDI õpetus: brauseripõhiste helirakenduste loomine, mida kontrollib MIDI riistvara
Peatage prügikast: juhend pikka aega kestvate liideste kujundamiseks
Peatage prügikast: juhend pikka aega kestvate liideste kujundamiseks
Kujundus täpsusega - Adobe XD ülevaade
Kujundus täpsusega - Adobe XD ülevaade
MCMC meetodid: Metropolis-Hastings ja Bayesi järeldus
MCMC meetodid: Metropolis-Hastings ja Bayesi järeldus
Kuidas kujundada suurepäraseid kogemusi asjade Interneti jaoks
Kuidas kujundada suurepäraseid kogemusi asjade Interneti jaoks
 
Kuidas C ++ töötab: kompileerimise mõistmine
Kuidas C ++ töötab: kompileerimise mõistmine
Mida Game UX saab meile toote disaini kohta õpetada
Mida Game UX saab meile toote disaini kohta õpetada
CSS-i paigutuse õpetus: klassikalistest lähenemistest kuni uusimate tehnikateni
CSS-i paigutuse õpetus: klassikalistest lähenemistest kuni uusimate tehnikateni
Koodi optimeerimine: optimaalne viis optimeerimiseks
Koodi optimeerimine: optimaalne viis optimeerimiseks
Kujundame Facebooki ümber: 10 inspiratsiooni alustamiseks
Kujundame Facebooki ümber: 10 inspiratsiooni alustamiseks
Lemmik Postitused
  • kui palju on riskikapitalifirmasid
  • negatiivsete reaalsete intressimäärade olemasolu võib majandust stimuleerida, kui vastusena
  • jõudluse häälestamise tehnikad sql-serveris
  • andmekvaliteet andmelaos
  • mis on Adobe xd?
Kategooriad
Tehnoloogia Brändikujundus Finantsprotsessid Andmeteadus Ja Andmebaasid Toote Elutsükkel Muu Vilgas Talent Kpi-D Ja Analytics Kasumlikkus Ja Tõhusus Ux Disain

© 2021 | Kõik Õigused Kaitstud

apeescape2.com