串流裝飾器參考
cartesianProduct
cartesianProduct
函式會將具有多值欄位(即陣列)的單一元組轉換為多個元組,每個元組對應陣列欄位中的一個值。也就是說,如果給定一個單一元組,其中包含欄位 fieldA 的 N 個值陣列,cartesianProduct
函式將會輸出 N 個元組,每個元組都具有原始元組陣列中的一個值。基本上,您可以將陣列扁平化以進行進一步處理。
例如,使用 cartesianProduct
,您可以將這個元組
{
"fieldA": "foo",
"fieldB": ["bar","baz","bat"]
}
轉換為以下 3 個元組
{
"fieldA": "foo",
"fieldB": "bar"
}
{
"fieldA": "foo",
"fieldB": "baz"
}
{
"fieldA": "foo",
"fieldB": "bat"
}
cartesianProduct 參數
-
incoming stream
:(強制)單一輸入串流。 -
fieldName or evaluator
:(強制)要扁平化其值的欄位名稱,或應扁平化其結果的評估器。 -
productSort='fieldName ASC|DESC'
:(選用)新產生的元組的排序順序。
cartesianProduct 語法
cartesianProduct(
<stream>,
<fieldName | evaluator> [as newFieldName],
productSort='fieldName ASC|DESC'
)
cartesianProduct 範例
下列範例顯示此來源元組的不同輸出
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
}
單一欄位,無排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": [1,2,3]
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": [1,2,3]
}
單一評估器,無排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 14
}
單一欄位,依值排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
productSort="fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": [1,2,3]
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": [1,2,3]
}
單一評估器,依評估器值排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE,
productSort="newFieldE desc"
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 14
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 4
}
重新命名單一欄位,依值排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB as newFieldB,
productSort="fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
"newFieldB": "valueB2",
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
"newFieldB": "valueB1",
}
多個欄位,無排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
多個欄位,依單一欄位排序
cartesianProduct(
search(collection1, qt="/export", q="*:*", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC,
productSort="fieldC asc"
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
多個欄位,依多個欄位排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC,
productSort="fieldC asc, fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
欄位與評估器,無排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE,
fieldB
)
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 14
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 14
}
如您在上述範例中所見,cartesianProduct
函式確實支援跨多個欄位和/或評估器扁平化元組。
classify
classify
函式使用邏輯迴歸文字分類模型來分類元組。它專門設計用於使用train
函式訓練的模型。classify
函式使用model
函式來檢索儲存的模型,然後使用該模型對元組流進行評分。分類器讀取的元組必須包含一個可用於分類的文字欄位。classify
函式使用 Lucene 分析器從文字中提取特徵,以便應用模型。預設情況下,classify
函式會使用元組中文字欄位的名稱來尋找分析器。如果工作節點上的 Solr schema 不包含此欄位,則可以透過指定 analyzerField
參數在另一個欄位中尋找分析器。
每個分類的元組都會被賦予兩個分數
-
probability_d*:介於 0 和 1 之間的浮點數,描述元組屬於該類別的機率。這在分類用例中很有用。
-
score_d*:文件的分數,未壓縮在 0 和 1 之間。分數可能是正數或負數。分數越高,文件越符合該類別。這種未壓縮的分數在查詢重新排名和推薦用例中很有用。當多個高排名文件的 probability_d 分數為 1 時,此分數尤其有用,這樣無法在文件之間提供有意義的排名。
classify 參數
-
model expression
: (必填) 檢索儲存的邏輯迴歸模型。 -
field
: (必填) 要將分類器應用於的元組中的欄位。預設情況下,將使用 schema 中此欄位的分析器來提取特徵。 -
analyzerField
: (選填) 指定 schema 中另一個欄位來尋找分析器。
classify 語法
classify(model(modelCollection,
id="model1",
cacheMillis=5000),
search(contentCollection,
q="id:(a b c)",
qt="/export",
fl="text_t, id",
sort="id asc"),
field="text_t")
在上面的範例中,classify expression
使用 api
函式檢索模型。然後,它會分類 search
函式傳回的元組。text_t
欄位用於文字分類,而 Solr schema 中 text_t
欄位的分析器用於分析文字並提取特徵。
commit
commit
函式包裝單個串流 (A),並在批次大小滿足或到達串流結尾時,向集合傳送 commit 訊息,給定集合和批次大小。commit 串流最常與更新串流一起使用,因此 commit 會考慮來自更新串流的可能摘要元組。所有進入 commit 串流的元組都將從 commit 串流中傳回 - 不會刪除任何元組,也不會新增任何元組。
commit 參數
-
collection
:要向其傳送 commit 訊息的集合 (必填) -
batchSize
:commit 批次大小,當達到批次大小時傳送 commit 訊息。如果未提供 (或提供的值為 0),則僅在傳入串流結束時才傳送 commit。 -
waitFlush
:直接傳遞給 commit 處理常式的數值 (true/false,預設值:false) -
waitSearcher
:直接傳遞給 commit 處理常式的數值 (true/false,預設值:false) -
softCommit
:直接傳遞給 commit 處理常式的數值 (true/false,預設值:false) -
StreamExpression for StreamA
(必填)
complement
complement
函式包裝兩個串流 (A 和 B),並從 A 發出 B 中不存在的元組。元組按照它們在串流 A 中出現的順序發出。兩個串流都必須依用於確定相等性的欄位排序 (使用 on
參數)。
complement 參數
-
StreamExpression for StreamA
-
StreamExpression for StreamB
-
on
:用於檢查 A 和 B 之間元組相等性的欄位。格式可以是on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
。
complement 語法
complement(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
on="a_i"
)
complement(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
on="a_i,a_s"
)
daemon
daemon
函式包裝另一個函式,並使用內部執行緒以間隔執行它。daemon
函式可用於提供連續推送和拉取串流。
連續推送串流
使用連續推送串流,daemon
函式包裝另一個函式,然後傳送到 /stream
處理常式以執行。/stream
處理常式會識別 daemon
函式並將其保留在記憶體中,以便它可以按間隔執行其內部函式。
為了方便推送元組,daemon
函式必須包裝另一個將元組推送到某處的串流修飾符。其中一個範例是 update
函式,它包裝串流並將元組傳送到另一個 SolrCloud 集合以進行索引。
daemon 語法
daemon(id="uniqueId",
runInterval="1000",
terminate="true",
update(destinationCollection,
batchSize=100,
topic(checkpointCollection,
topicCollection,
q="topic query",
fl="id, title, abstract, text",
id="topicId",
initialCheckpoint=0)
)
)
上面的範例程式碼顯示 daemon
函式包裝 update
函式,而 update
函式又包裝 topic
函式。當此表達式傳送到 /stream
處理常式時,/stream
處理常式會看到 daemon
函式並將其保留在記憶體中,並以間隔執行。在此特定範例中,daemon
函式將每秒執行 update
函式。update
函式包裝 topic
函式,該函式將分批串流與 topic
函式查詢相符的元組。對主題的每個後續呼叫都會傳回該主題的下一批元組。update
函式會將所有與主題匹配的元組傳送到另一個集合以進行索引。terminate
參數會告知守護程式在 topic
函式停止傳送元組時終止。
這樣做的效果是將符合特定查詢的文件推送到另一個集合中。自訂推送函式可以插入,將文件從 Solr 推送到其他系統,例如 Kafka 或電子郵件系統。
推送串流也可用於連續背景彙總場景,在這些場景中,彙總會以間隔在背景中匯總並推送到其他 Solr 集合。另一個用例是連續背景機器學習模型最佳化,其中將最佳化的模型推送到另一個 Solr 集合,以便將其整合到查詢中。
/stream
處理常式支援一小組指令,用於列出和控制守護程式函式
https://127.0.0.1:8983/solr/collection/stream?action=list
此命令將提供特定節點上目前正在執行的守護程式清單,以及它們的目前狀態。
https://127.0.0.1:8983/solr/collection/stream?action=stop&id=daemonId
此命令將停止特定的守護程式函式,但將其保留在記憶體中。
https://127.0.0.1:8983/solr/collection/stream?action=start&id=daemonId
此命令將啟動已停止的特定守護程式函式。
https://127.0.0.1:8983/solr/collection/stream?action=kill&id=daemonId
此命令將停止特定的守護程式函式並將其從記憶體中移除。
連續拉取串流
DaemonStream java 類別 (SolrJ 程式庫的一部分) 也可以嵌入到 java 應用程式中,以提供連續拉取串流。範例程式碼
StreamContext context = new StreamContext()
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
Map topicQueryParams = new HashMap();
topicQueryParams.put("q","hello"); // The query for the topic
topicQueryparams.put("rows", "500"); // How many rows to fetch during each run
topicQueryparams.put("fl", "id", "title"); // The field list to return with the documents
TopicStream topicStream = new TopicStream(zkHost, // Host address for the ZooKeeper service housing the collections
"checkpoints", // The collection to store the topic checkpoints
"topicData", // The collection to query for the topic records
"topicId", // The id of the topic
-1, // checkpoint every X tuples, if set -1 it will checkpoint after each run.
topicQueryParams); // The query parameters for the TopicStream
DaemonStream daemonStream = new DaemonStream(topicStream, // The underlying stream to run.
"daemonId", // The id of the daemon
1000, // The interval at which to run the internal stream
500); // The internal queue size for the daemon stream. Tuples will be placed in the queue
// as they are read by the internal thread.
// Calling read() on the daemon stream reads records from the internal queue.
daemonStream.setStreamContext(context);
daemonStream.open();
//Read until it's time to shutdown the DaemonStream. You can define the shutdown criteria.
while(!shutdown()) {
Tuple tuple = daemonStream.read() // This will block until tuples become available from the underlying stream (TopicStream)
// The EOF tuple (signaling the end of the stream) will never occur until the DaemonStream has been shutdown.
//Do something with the tuples
}
// Shutdown the DaemonStream.
daemonStream.shutdown();
//Read the DaemonStream until the EOF Tuple is found.
//This allows the underlying stream to perform an orderly shutdown.
while(true) {
Tuple tuple = daemonStream.read();
if(tuple.EOF) {
break;
} else {
//Do something with the tuples.
}
}
//Finally close the stream
daemonStream.close();
delete
delete
函式包裝其他函式,並使用找到的 id
和 _version_
值將元組作為依 ID 刪除命令傳送到 SolrCloud 集合。
這與下面描述的 update()
函式類似。
delete 參數
-
destinationCollection
: (必填) 將刪除元組的集合。 -
batchSize
: (選填,預設值為250
) 刪除批次大小。 -
pruneVersionField
: (選填,預設值為false
) 是否從元組中修剪_version_
值 -
StreamExpression
: (必填)
delete 語法
delete(collection1,
batchSize=500,
search(collection1,
q=old_data:true,
qt="/export",
fl="id",
sort="a_f asc, a_i asc"))
上面的範例會取用 search
函式針對 collection1
傳回的元組,並將每個找到文件的 id
值轉換為針對同一 collection1
的刪除請求。
與 希望忽略並行更新並刪除所有匹配文件的使用者應設定 預期會有並行更新,並希望「跳過」任何失敗刪除的使用者,應考慮設定 |
eval
eval
函式允許在使用案例中,動態產生新的串流表達式,然後進行評估。eval
函式包裝一個串流表達式,並從基礎串流中讀取單個元組。然後,eval
函式會從元組的 expr_s
欄位中檢索字串串流表達式。然後,eval
函式會編譯字串串流表達式並發出元組。
executor
executor
函式包裝包含串流表達式的串流來源,並平行執行這些表達式。executor
函式會在每個元組的 expr_s
欄位中尋找表達式。executor
函式有一個內部執行緒集區,會在相同工作節點上平行執行編譯和執行表達式的任務。此函式也可以透過將其包裝在 parallel
函式中,跨工作節點平行化,以提供跨叢集平行執行表達式。
executor
函式並不會對其執行的表達式輸出進行任何特定的處理。因此,執行的表達式必須包含將元組推送到其目標位置的邏輯。 update 函式可以包含在正在執行的表達式中,以便將元組發送到 SolrCloud 集合進行儲存。
此模型允許非同步執行作業,其中輸出儲存在 SolrCloud 集合中,以便在作業進行時存取。
fetch
fetch
函式會迭代串流並提取額外的欄位,然後將它們添加到元組中。fetch
函式會分批提取,以限制回呼 Solr 的次數。從 fetch
函式串流的元組將包含原始欄位和提取的額外欄位。 fetch
函式支援一對一提取。多對一提取(串流來源包含重複鍵)也可以運作,但此函式目前不支援一對多提取。
having
having
表達式會包裝一個串流,並對每個元組套用布林運算。它只會發出布林運算返回 true 的元組。
leftOuterJoin
leftOuterJoin
函式會包裝兩個串流,左 (Left) 和右 (Right),並從左串流 (Left) 發出元組。如果右串流 (Right) 中存在相等的元組(由 on
定義),則該元組中的值將包含在發出的元組中。左元組 (Left) 不一定要存在右串流 (Right) 中相等的元組才能發出。這支援一對一、一對多、多對一和多對多左外部聯結案例。元組會按照它們在左串流 (Left) 中出現的順序發出。兩個串流都必須依用於判斷相等性(使用 on
參數)的欄位排序。如果兩個元組都包含名稱相同的欄位,則發出的元組中將使用右串流 (Right) 的值。
您可以將傳入的串流包裝在 select
函式中,以指定在發出的元組中包含哪些欄位值。
leftOuterJoin 參數
-
StreamLeft 的 StreamExpression
-
StreamRight 的 StreamExpression
-
on
: 用於檢查左 (Left) 和右 (Right) 之間元組相等性的欄位。可以採用on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
的格式。
leftOuterJoin 語法
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
hashJoin
hashJoin
函式會包裝兩個串流,左 (Left) 和右 (Right),對於左串流 (Left) 中存在於右串流 (Right) 中的每個元組,都會發出一個包含兩個元組欄位的元組。這支援一對一、一對多、多對一和多對多內部聯結案例。元組會按照它們在左串流 (Left) 中出現的順序發出。串流的順序無關緊要。如果兩個元組都包含名稱相同的欄位,則發出的元組中將使用右串流 (Right) 的值。
您可以將傳入的串流包裝在 select
函式中,以指定在發出的元組中包含哪些欄位值。
當左 (Left) 和右 (Right) 的元組無法按相同順序排列時,可以使用 hashJoin 函式。由於元組的順序不一致,此串流函式會在 open 作業期間讀取右串流 (Right) 中的所有值,並將所有元組儲存在記憶體中。這會導致記憶體用量等於右串流 (Right) 的大小。
hashJoin 參數
-
StreamLeft 的 StreamExpression
-
hashed=StreamRight 的 StreamExpression
-
on
: 用於檢查左 (Left) 和右 (Right) 之間元組相等性的欄位。可以採用on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
的格式。
hashJoin 語法
hashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
hashJoin(
search(people, q="*:*", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
hashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
innerJoin
包裝兩個串流,左 (Left) 和右 (Right)。對於左串流 (Left) 中存在於右串流 (Right) 中的每個元組,都會發出一個包含兩個元組欄位的元組。這支援一對一、一對多、多對一和多對多內部聯結案例。元組會按照它們在左串流 (Left) 中出現的順序發出。兩個串流都必須依用於判斷相等性('on' 參數)的欄位排序。如果兩個元組都包含名稱相同的欄位,則發出的元組中將使用右串流 (Right) 的值。您可以將傳入的串流包裝在 select(…)
表達式中,以指定在發出的元組中包含哪些欄位值。
innerJoin 參數
-
StreamLeft 的 StreamExpression
-
StreamRight 的 StreamExpression
-
on
: 用於檢查左 (Left) 和右 (Right) 之間元組相等性的欄位。可以採用on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
的格式。
innerJoin 語法
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
intersect
intersect
函式會包裝兩個串流,A 和 B,並發出 DO 存在於 B 中的 A 元組。元組會按照它們在串流 A 中出現的順序發出。兩個串流都必須依用於判斷相等性(on
參數)的欄位排序。只會發出 A 的元組。
intersect 參數
-
StreamExpression for StreamA
-
StreamExpression for StreamB
-
on
:用於檢查 A 和 B 之間元組相等性的欄位。格式可以是on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
。
intersect 語法
intersect(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
on="a_i"
)
intersect(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
on="a_i,a_s"
)
list
list
函式會包裝 N 個串流表達式,並依序開啟和迭代每個串流。這會產生串連多個串流表達式結果的效果。
list 語法
list(tuple(a="hello world"), tuple(a="HELLO WORLD"))
list(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))
list(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))
merge
merge
函式會合併兩個或多個串流表達式,並維護底層串流的順序。由於維護了順序,因此底層串流的排序必須與提供給 merge 函式的 on 參數一致。
merge 參數
-
串流表達式 A
-
串流表達式 B
-
選填串流表達式 C,D,….Z
-
on
: 用於執行合併的排序條件。格式為fieldName order
,其中 order 為asc
或desc
。可以提供多個欄位,格式為fieldA order, fieldB order
。
merge 語法
# Merging two stream expressions together
merge(
search(collection1,
q="id:(0 3 4)",
qt="/export",
fl="id,a_s,a_i,a_f",
sort="a_f asc"),
search(collection1,
q="id:(1)",
qt="/export",
fl="id,a_s,a_i,a_f",
sort="a_f asc"),
on="a_f asc")
# Merging four stream expressions together. Notice that while the sorts of each stream are not identical they are
# comparable. That is to say the first N fields in each stream's sort matches the N fields in the merge's on clause.
merge(
search(collection1,
q="id:(0 3 4)",
qt="/export",
fl="id,fieldA,fieldB,fieldC",
sort="fieldA asc, fieldB desc"),
search(collection1,
q="id:(1)",
qt="/export",
fl="id,fieldA",
sort="fieldA asc"),
search(collection2,
q="id:(10 11 13)",
qt="/export",
fl="id,fieldA,fieldC",
sort="fieldA asc"),
search(collection3,
q="id:(987)",
qt="/export",
fl="id,fieldA,fieldC",
sort="fieldA asc"),
on="fieldA asc")
null
null 表達式是一個有用的實用函式,用於了解執行平行關係代數 (聯結、交集、彙總等) 時的瓶頸。null 函式會讀取底層串流中的所有元組,並傳回包含計數和處理時間的單個元組。由於 null 串流本身增加的額外負擔很小,因此可以用於隔離 Solr /export
處理常式的效能。如果 /export 處理常式的效能不是瓶頸,則瓶頸很可能發生在執行串流裝飾器的背景工作節點中。
null 表達式可以由 parallel 函式包裝並傳送到背景工作節點。在這種情況下,每個背景工作節點都會傳回一個包含在背景工作節點上處理的元組計數和該背景工作節點的計時資訊的元組。這會提供有價值的資訊,例如
-
隨著更多背景工作節點的加入,/export 處理常式的效能是否提高。
-
元組是否均勻地分佈在各個背景工作節點上,或者雜湊分割是否將更多文件傳送到單個背景工作節點。
-
所有背景工作節點是否以相同的速度處理資料,或者其中一個背景工作節點是否是瓶頸的來源。
outerHashJoin
outerHashJoin
函式會包裝兩個串流,左 (Left) 和右 (Right),並從左串流 (Left) 發出元組。如果右串流 (Right) 中存在相等的元組(由 on
參數定義),則該元組中的值將包含在發出的元組中。左元組 (Left) 不一定要存在右串流 (Right) 中相等的元組才能發出。這支援一對一、一對多、多對一和多對多左外部聯結案例。元組會按照它們在左串流 (Left) 中出現的順序發出。串流的順序無關緊要。如果兩個元組都包含名稱相同的欄位,則發出的元組中將使用右串流 (Right) 的值。
您可以將傳入的串流包裝在 select
函式中,以指定在發出的元組中包含哪些欄位值。
當左 (Left) 和右 (Right) 的元組無法按相同順序排列時,可以使用 outerHashJoin 串流。由於元組的順序不一致,此串流會在 open 作業期間讀取右串流 (Right) 中的所有值,並將所有元組儲存在記憶體中。這會導致記憶體用量等於右串流 (Right) 的大小。
outerHashJoin 參數
-
StreamLeft 的 StreamExpression
-
hashed=StreamRight 的 StreamExpression
-
on
: 用於檢查左 (Left) 和右 (Right) 之間元組相等性的欄位。可以採用on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
的格式。
outerHashJoin 語法
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
parallel
parallel
函式會包裝一個串流表達式,並將其傳送到 N 個背景工作節點進行平行處理。
parallel
函式要求為底層搜尋提供 partitionKeys
參數。 partitionKeys
參數會跨背景工作節點分割搜尋結果(元組)。具有相同 partitionKeys
值的元組將會被隨機分派到相同的背景工作節點。
parallel
函式會維護背景工作節點傳回之元組的排序順序,因此排序條件必須包含背景工作節點傳回之元組的排序順序。
例如,如果您按年、月和日排序,則只要有足夠的不同年份將元組分散到各個背景工作節點,您就可以只按年份分割。
Solr 允許根據超過 4 個欄位進行排序,但考量到速度,您不能指定超過 4 個 partitionKeys
。此外,當一或兩個鍵就足以分散元組時,指定過多的 partitionKeys
也屬多餘。
當底層搜尋串流會從集合發出大量元組時,就會設計平行串流。如果搜尋串流僅使用 parallel
發出來自集合的一小部分資料子集,則可能會較慢。
工作節點集合
工作節點可以來自與資料相同的集合,或者它們可以完全是不同的集合,甚至是僅為了 |
平行參數
-
collection
:要將 StreamExpression 傳送到的工作節點集合名稱。 -
StreamExpression
:要傳送到工作節點集合的運算式。 -
workers
:要將運算式傳送到的工作節點集合中的工作節點數。 -
zkHost
:(選用)工作節點集合所在的 ZooKeeper 連線字串。僅當使用與您連線的 Solr 實例相同的 ZkHost 時(chroot
可以不同),才會包含 Zookeeper 認證和 ACL。 -
sort
:用於排序工作節點傳回的元組的排序條件。
平行語法
parallel(workerCollection,
rollup(search(collection1, q="*:*", fl="id,year_i,month_i,day_i", qt="/export", sort="year_i desc,month_i desc,day_i asc", partitionKeys="year_i"),
over="year_i", count(*)),
workers="20",
zkHost="localhost:9983",
sort="year_i desc")
上面的運算式顯示 parallel
函式封裝了 rollup
函式。這會導致 rollup
函式在 20 個工作節點上平行執行。
預熱
對於具有相同數量的工作節點和
|
plist
plist
函式會封裝 N 個串流運算式,並平行開啟串流,然後循序迭代每個串流。list
和 plist
的區別在於,串流是平行開啟的。由於許多串流(例如 facet
、stats
和 significantTerms
)在開啟時會將繁重的運算推送到 Solr,因此 plist 函式可以透過平行執行這些運算來顯著提高效能。
plist 語法
plist(tuple(a="hello world"), tuple(a="HELLO WORLD"))
plist(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))
plist(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))
priority
priority
函式是 executor 函式的簡單優先順序排程器。executor
函式沒有直接的任務優先順序概念;它只是按照從底層串流讀取任務的順序執行任務。priority
函式提供在較早提交的較低優先順序任務之前排程較高優先順序任務的能力。
priority
函式封裝了兩個 topic
函式,這兩個函式都會發出包含要執行的串流運算式的元組。第一個主題被視為較高優先順序的任務佇列。
每次呼叫 priority
函式時,它都會檢查較高優先順序的任務佇列,以查看是否有任何任務要執行。如果較高優先順序的佇列中有任務在等待,則 priority 函式將會發出較高優先順序的任務。如果沒有要執行的較高優先順序任務,則會發出較低優先順序佇列中的任務。
priority
函式每次呼叫時只會從其中一個佇列發出一批任務。這可確保在較高優先順序佇列中沒有要執行的任務之前,不會執行任何較低優先順序的任務。
priority 語法
daemon(id="myDaemon",
executor(threads=10,
priority(topic(checkpointCollection, storedExpressions, q="priority:high", fl="id, expr_s", initialCheckPoint=0,id="highPriorityTasks"),
topic(checkpointCollection, storedExpressions, q="priority:low", fl="id, expr_s", initialCheckPoint=0,id="lowPriorityTasks"))))
在上面的範例中,daemon
函式會迭代呼叫 executor。每次呼叫時,executor
函式都會執行 priority
函式發出的任務。priority
函式封裝了兩個主題。第一個主題是較高優先順序的任務佇列,第二個主題是較低優先順序的主題。
reduce
reduce
函式會封裝內部串流,並按常見欄位分組元組。
每個元組組都會透過可外掛的 reduce 運算,作為單一區塊進行運算。Solr 提供的分組運算實作了分散式分組功能。分組運算也作為一個範例 reduce 運算,在建置自訂 reduce 運算時可供參考。
reduce 函式依賴底層串流的排序順序。因此,底層串流的排序順序必須與 group by 欄位對齊。 |
rollup
rollup
函式會封裝另一個串流函式,並在 bucket 欄位上彙總總計值。rollup 函式依賴底層串流的排序順序,以便一次彙總一個分組的總計值。因此,底層串流的排序順序必須與 rollup 函式的 over
參數中的欄位相符。
rollup 函式也需要處理整個結果集,才能執行其彙總。當底層串流是 search
函式時,可以使用 /export
處理常式將完整排序的結果集提供給 rollup 函式。這種排序方法允許 rollup 函式對非常高基數的欄位執行彙總。這種方法的缺點是,必須將元組排序並透過網路串流到工作節點進行彙總。為了更快地彙總低到中等基數的欄位,可以使用 facet
函式。
rollup 參數
-
StreamExpression
(必要) -
over
:(必要)要依據分組的欄位清單。 -
metrics
:(必要)要計算的指標清單。目前支援的指標為sum(col)
、avg(col)
、min(col)
、max(col)
、count(*)
。
rollup 語法
rollup(
search(collection1, q="*:*", qt="/export", fl="a_s,a_i,a_f", qt="/export", sort="a_s asc"),
over="a_s",
sum(a_i),
sum(a_f),
min(a_i),
min(a_f),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
count(*)
)
上面的範例顯示了封裝 search 函式的 rollup 函式。請注意,search 函式使用 /export
處理常式將整個結果集提供給 rollup 串流。另請注意,search 函式的 sort
參數與 rollup 的 over
參數相符。這允許 rollup 函式一次一個分組地在 a_s
欄位上彙總。
scoreNodes
請參閱 圖形遍歷中的章節。
select
select
函式會封裝串流運算式,並輸出包含來自傳入元組的欄位子集或修改集合的元組。輸出元組中包含的欄位清單可以包含別名,以有效地重新命名欄位。select
串流支援運算和評估器。您可以提供運算和評估器的清單,以在任何欄位上執行,例如 replace、add、if
等。
select 參數
-
StreamExpression
-
fieldName
:要包含在輸出元組中的欄位名稱(可以包含多個),例如outputTuple[fieldName] = inputTuple[fieldName]
。fieldName
可以是萬用字元模式,例如a_*
以選取所有以a_
開頭的欄位。 -
fieldName as aliasFieldName
:要包含在輸出元組中的別名欄位名稱(可以包含多個),例如outputTuple[aliasFieldName] = incomingTuple[fieldName]
-
replace(fieldName, value, withValue=replacementValue)
:如果incomingTuple[fieldName] == value
,則outgoingTuple[fieldName]
會設定為replacementValue
。value
可以是字串「null」,以將空值取代為其他值。 -
replace(fieldName, value, withField=otherFieldName)
:如果incomingTuple[fieldName] == value
,則outgoingTuple[fieldName]
會設定為incomingTuple[otherFieldName]
的值。value
可以是字串「null」,以將空值取代為其他值。
select 語法
// output tuples with fields teamName, wins, losses, and winPercentages where a null value for wins or losses is translated to the value of 0
select(
search(collection1, fl="id,teamName_s,wins,losses", q="*:*", qt="/export", sort="id asc"),
teamName_s as teamName,
wins,
losses,
replace(wins,null,withValue=0),
replace(losses,null,withValue=0),
if(eq(0,wins), 0, div(add(wins,losses), wins)) as winPercentage
)
sort
sort
函式會封裝串流運算式,並重新排序元組。sort 函式會以新的排序順序發出所有傳入的元組。sort 函式會從傳入的串流讀取所有元組,使用具有 O(nlog(n))
效能特性的演算法重新排序它們,其中 n 是傳入串流中元組的總數,然後以新的排序順序輸出元組。由於所有元組都會讀取到記憶體中,因此此函式的記憶體消耗會隨著傳入串流中元組的數量線性成長。
sort 語法
下面的運算式會尋找狗主人,並依據主人和寵物名稱排序結果。請注意,它會先依據人員/擁有者 ID 排序,然後再依據擁有者和寵物名稱重新排序最終輸出,以使用有效率的 innerJoin。
sort(
innerJoin(
search(people, q="*:*", qt="/export", fl="id,name", sort="id asc"),
search(pets, q="type:dog", qt="/export", fl="owner,petName", sort="owner asc"),
on="id=owner"
),
by="name asc, petName asc"
)
unique
unique
函式會封裝串流運算式,並根據 over
參數發出唯一的元組串流。unique 函式依賴底層串流的排序順序。over
參數必須與底層串流的排序順序相符。
unique 函式實作了非並置的 unique 演算法。這表示具有相同 unique over
欄位的記錄不需要並置在相同的分片上。當平行執行時,partitionKeys
參數必須與 unique over
欄位相同,以便將具有相同索引鍵的記錄洗牌到相同的工作節點。
更新
update
函數會包裝另一個函數,並將元組傳送到 SolrCloud 集合以索引為文件。
更新參數
-
destinationCollection
: (必填) 將索引元組的集合。 -
batchSize
: (選填,預設為250
) 索引批次大小。 -
pruneVersionField
: (選填,預設為true
) 是否從元組中刪除_version_
值 -
StreamExpression
: (必填)
更新語法
update(destinationCollection,
batchSize=500,
search(collection1,
q=*:*,
qt="/export",
fl="id,a_s,a_i,a_f,s_multi,i_multi",
sort="a_f asc, a_i asc"))
上面的範例將 search
函數傳回的元組傳送到 destinationCollection
以進行索引。
如本範例所示包裝 search(…)
是此裝飾器的常見用法:從集合中讀取文件作為元組,以某種方式處理或修改它們,然後將它們添加回新的集合。因此,pruneVersionField=true
是預設行為 — 在將元組轉換為 Solr 文件時,去除在內部串流中找到的任何 _version_
值,以防止 樂觀並行約束產生的任何意外錯誤。