Source code for intake_solr.source

from intake.source import base
import pandas as pd
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
import pysolr
from . import __version__


[docs]class SOLRSequenceSource(base.DataSource): """Execute a query on SOLR Parameters ---------- query: str Query to execute, in Lucene syntax, e.g., ``"*:*"`` base_url: str Connection on which to reach SOLR, including protocol (http), server, port and base path. If using Zookeeper, this should be the full comma-separated list of service:port/path elements. core: str Named segment of the SOLR storage to query qargs: dict Further parameters to pass with the query (e.g., highlighting) metadata: dict Additional information to associate with this source auth: None, "kerberos" or (username, password) Authentication to attach to requests cert: str or None Path to SSL certificate, if required zoocollection: bool or str If using Zookeeper to orchestrate SOLR, this is the name of the collection to connect to. """ container = 'python' name = 'solr' version = __version__ partition_access = False def __init__(self, query, base_url, core, qargs=None, metadata=None, auth=None, cert=None, zoocollection=False): self.query = query self.qargs = qargs or {} self.metadata = metadata or {} self._schema = None if auth == 'kerberos': auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL, sanitize_mutual_error_response=False) if zoocollection: url = ','.join(['/'.join([b, core]) for b in base_url.split(',')]) zoo = pysolr.ZooKeeper(url) if auth or cert: self.solr = pysolr.SolrCloud(zoo, zoocollection, auth=auth, verify=cert) else: # conda released pysolr does not support auth= self.solr = pysolr.SolrCloud(zoo, zoocollection) else: url = '/'.join([base_url, core]) if auth or cert: self.solr = pysolr.Solr(url, auth=auth, verify=cert) else: # conda released pysolr does not support auth= self.solr = pysolr.Solr(url) super(SOLRSequenceSource, self).__init__(metadata=metadata) def _get_schema(self): return base.Schema(datashape=None, dtype=None, shape=None, npartitions=1, extra_metadata={}) def _do_query(self): out = [] data = self.solr.search(self.query, **self.qargs).docs for d in data: out.append({k: (v[0] if isinstance(v, (tuple, list)) else v) for k, v in d.items()}) return out def _get_partition(self, _): """Downloads all data """ return self._do_query()
[docs]class SOLRTableSource(SOLRSequenceSource): """Execute a query on SOLR, return as dataframe Parameters ---------- query: str Query to execute, in Lucene syntax, e.g., ``"*:*"`` base_url: str Connection on which to reach SOLR, including protocol (http), server, port and base path. If using Zookeeper, this should be the full comma-separated list of service:port/path elements. core: str Named segment of the SOLR storage to query qargs: dict Further parameters to pass with the query (e.g., highlighting) metadata: dict Additional information to associate with this source auth: None, "kerberos" or (username, password) Authentication to attach to requests cert: str or None Path to SSL certificate, if required zoocollection: bool or str If using Zookeeper to orchestrate SOLR, this is the name of the collection to connect to. """ container = 'dataframe' def _get_schema(self, retry=2): """Get schema from first 10 hits or cached dataframe""" if not hasattr(self, '_dataframe'): self._get_partition(0) dtype = {k: str(v) for k, v in self._dataframe.dtypes.to_dict().items()} return base.Schema(datashape=None, dtype=dtype, shape=self._dataframe.shape, npartitions=1, extra_metadata={}) def _get_partition(self, _): """Downloads all data """ if not hasattr(self, '_dataframe'): df = pd.DataFrame(self._do_query()) self._dataframe = df self._schema = None self.discover() return self._dataframe def _close(self): self._dataframe = None